Skip to content

Commit

Permalink
Remove futures from scheduler_factory (#336)
Browse files Browse the repository at this point in the history
If we fail to create a scheduler we can't continue.
  • Loading branch information
aaronmondal committed Oct 24, 2023
1 parent 6c937da commit f15146d
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 74 deletions.
3 changes: 1 addition & 2 deletions cas/cas_main.rs
Expand Up @@ -95,8 +95,7 @@ async fn inner_main(cfg: CasConfig, server_start_timestamp: u64) -> Result<(), B
let scheduler_metrics = root_scheduler_metrics.sub_registry_with_prefix(&name);
let (maybe_action_scheduler, maybe_worker_scheduler) =
scheduler_factory(&scheduler_cfg, &store_manager, scheduler_metrics)
.await
.err_tip(|| format!("Failed to create scheduler '{}'", name))?;
.err_tip(|| format!("Failed to create scheduler '{name}'"))?;
if let Some(action_scheduler) = maybe_action_scheduler {
action_schedulers.insert(name.clone(), action_scheduler);
}
Expand Down
137 changes: 65 additions & 72 deletions cas/scheduler/default_scheduler_factory.rs
Expand Up @@ -13,11 +13,9 @@
// limitations under the License.

use std::collections::HashSet;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use futures::Future;
use tokio::time::interval;

use cache_lookup_scheduler::CacheLookupScheduler;
Expand All @@ -32,10 +30,10 @@ use store::StoreManager;

pub type SchedulerFactoryResults = (Option<Arc<dyn ActionScheduler>>, Option<Arc<dyn WorkerScheduler>>);

pub async fn scheduler_factory<'a>(
scheduler_type_cfg: &'a SchedulerConfig,
store_manager: &'a StoreManager,
scheduler_metrics: &'a mut Registry,
pub fn scheduler_factory(
scheduler_type_cfg: &SchedulerConfig,
store_manager: &StoreManager,
scheduler_metrics: &mut Registry,
) -> Result<SchedulerFactoryResults, Error> {
let mut visited_schedulers = HashSet::new();
inner_scheduler_factory(
Expand All @@ -44,80 +42,75 @@ pub async fn scheduler_factory<'a>(
Some(scheduler_metrics),
&mut visited_schedulers,
)
.await
}

fn inner_scheduler_factory<'a>(
scheduler_type_cfg: &'a SchedulerConfig,
store_manager: &'a StoreManager,
maybe_scheduler_metrics: Option<&'a mut Registry>,
visited_schedulers: &'a mut HashSet<usize>,
) -> Pin<Box<dyn Future<Output = Result<SchedulerFactoryResults, Error>> + 'a>> {
Box::pin(async move {
let scheduler: SchedulerFactoryResults = match scheduler_type_cfg {
SchedulerConfig::simple(config) => {
let scheduler = Arc::new(SimpleScheduler::new(config));
(Some(scheduler.clone()), Some(scheduler))
}
SchedulerConfig::grpc(config) => (Some(Arc::new(GrpcScheduler::new(config)?)), None),
SchedulerConfig::cache_lookup(config) => {
let cas_store = store_manager
.get_store(&config.cas_store)
.err_tip(|| format!("'cas_store': '{}' does not exist", config.cas_store))?;
let ac_store = store_manager
.get_store(&config.ac_store)
.err_tip(|| format!("'ac_store': '{}' does not exist", config.ac_store))?;
let (action_scheduler, worker_scheduler) =
inner_scheduler_factory(&config.scheduler, store_manager, None, visited_schedulers)
.await
.err_tip(|| "In nested CacheLookupScheduler construction")?;
let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new(
cas_store,
ac_store,
action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
)?);
(Some(cache_lookup_scheduler), worker_scheduler)
}
SchedulerConfig::property_modifier(config) => {
let (action_scheduler, worker_scheduler) =
inner_scheduler_factory(&config.scheduler, store_manager, None, visited_schedulers)
.await
.err_tip(|| "In nested PropertyModifierScheduler construction")?;
let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new(
config,
action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
));
(Some(property_modifier_scheduler), worker_scheduler)
}
};
fn inner_scheduler_factory(
scheduler_type_cfg: &SchedulerConfig,
store_manager: &StoreManager,
maybe_scheduler_metrics: Option<&mut Registry>,
visited_schedulers: &mut HashSet<usize>,
) -> Result<SchedulerFactoryResults, Error> {
let scheduler: SchedulerFactoryResults = match scheduler_type_cfg {
SchedulerConfig::simple(config) => {
let scheduler = Arc::new(SimpleScheduler::new(config));
(Some(scheduler.clone()), Some(scheduler))
}
SchedulerConfig::grpc(config) => (Some(Arc::new(GrpcScheduler::new(config)?)), None),
SchedulerConfig::cache_lookup(config) => {
let cas_store = store_manager
.get_store(&config.cas_store)
.err_tip(|| format!("'cas_store': '{}' does not exist", config.cas_store))?;
let ac_store = store_manager
.get_store(&config.ac_store)
.err_tip(|| format!("'ac_store': '{}' does not exist", config.ac_store))?;
let (action_scheduler, worker_scheduler) =
inner_scheduler_factory(&config.scheduler, store_manager, None, visited_schedulers)
.err_tip(|| "In nested CacheLookupScheduler construction")?;
let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new(
cas_store,
ac_store,
action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
)?);
(Some(cache_lookup_scheduler), worker_scheduler)
}
SchedulerConfig::property_modifier(config) => {
let (action_scheduler, worker_scheduler) =
inner_scheduler_factory(&config.scheduler, store_manager, None, visited_schedulers)
.err_tip(|| "In nested PropertyModifierScheduler construction")?;
let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new(
config,
action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
));
(Some(property_modifier_scheduler), worker_scheduler)
}
};

if let Some(scheduler_metrics) = maybe_scheduler_metrics {
if let Some(action_scheduler) = &scheduler.0 {
start_cleanup_timer(action_scheduler);
// We need a way to prevent our scheduler form having `register_metrics()` called multiple times.
// This is the equivalent of grabbing a uintptr_t in C++, storing it in a set, and checking if it's
// already been visited. We can't use the Arc's pointer directly because it has two interfaces
// (ActionScheduler and WorkerScheduler) and we need to be able to know if the underlying scheduler
// has already been visited, not just the trait. `Any` could be used, but that'd require some rework
// of all the schedulers. This is the most simple way to do it. Rust's uintptr_t is usize.
let action_scheduler_uintptr: usize = Arc::as_ptr(action_scheduler) as *const () as usize;
if !visited_schedulers.contains(&action_scheduler_uintptr) {
visited_schedulers.insert(action_scheduler_uintptr);
action_scheduler.clone().register_metrics(scheduler_metrics);
}
if let Some(scheduler_metrics) = maybe_scheduler_metrics {
if let Some(action_scheduler) = &scheduler.0 {
start_cleanup_timer(action_scheduler);
// We need a way to prevent our scheduler form having `register_metrics()` called multiple times.
// This is the equivalent of grabbing a uintptr_t in C++, storing it in a set, and checking if it's
// already been visited. We can't use the Arc's pointer directly because it has two interfaces
// (ActionScheduler and WorkerScheduler) and we need to be able to know if the underlying scheduler
// has already been visited, not just the trait. `Any` could be used, but that'd require some rework
// of all the schedulers. This is the most simple way to do it. Rust's uintptr_t is usize.
let action_scheduler_uintptr: usize = Arc::as_ptr(action_scheduler).cast::<()>() as usize;
if !visited_schedulers.contains(&action_scheduler_uintptr) {
visited_schedulers.insert(action_scheduler_uintptr);
action_scheduler.clone().register_metrics(scheduler_metrics);
}
if let Some(worker_scheduler) = &scheduler.1 {
let worker_scheduler_uintptr: usize = Arc::as_ptr(worker_scheduler) as *const () as usize;
if !visited_schedulers.contains(&worker_scheduler_uintptr) {
visited_schedulers.insert(worker_scheduler_uintptr);
worker_scheduler.clone().register_metrics(scheduler_metrics);
}
}
if let Some(worker_scheduler) = &scheduler.1 {
let worker_scheduler_uintptr: usize = Arc::as_ptr(worker_scheduler).cast::<()>() as usize;
if !visited_schedulers.contains(&worker_scheduler_uintptr) {
visited_schedulers.insert(worker_scheduler_uintptr);
worker_scheduler.clone().register_metrics(scheduler_metrics);
}
worker_scheduler.clone().register_metrics(scheduler_metrics);
}
}

Ok(scheduler)
})
Ok(scheduler)
}

fn start_cleanup_timer(action_scheduler: &Arc<dyn ActionScheduler>) {
Expand Down

0 comments on commit f15146d

Please sign in to comment.