Skip to content

Commit

Permalink
Add property modifying scheduler.
Browse files Browse the repository at this point in the history
In order to avoid modifying the Goma executable, add a wrapping scheduler
that is able to modify the properties that are passed in from it.
  • Loading branch information
chrisstaite-menlo committed Jul 20, 2023
1 parent f0a526b commit 656e7f7
Show file tree
Hide file tree
Showing 5 changed files with 402 additions and 1 deletion.
37 changes: 37 additions & 0 deletions cas/scheduler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,42 @@ rust_test(
],
)

rust_library(
name = "property_modifier_scheduler",
srcs = ["property_modifier_scheduler.rs"],
visibility = [
"//cas:__pkg__",
"//cas:__subpackages__",
],
proc_macro_deps = ["@crate_index//:async-trait"],
deps = [
":action_messages",
":platform_property_manager",
":scheduler",
"//config",
"//util:error",
"@crate_index//:tokio",
],
)

rust_test(
name = "property_modifier_scheduler_test",
srcs = ["tests/property_modifier_scheduler_test.rs"],
deps = [
":action_messages",
":property_modifier_scheduler",
":mock_scheduler",
":platform_property_manager",
":scheduler",
":scheduler_utils",
"//config",
"//util:common",
"//util:error",
"@crate_index//:pretty_assertions",
"@crate_index//:tokio",
],
)

rust_library(
name = "grpc_scheduler",
srcs = ["grpc_scheduler.rs"],
Expand Down Expand Up @@ -172,6 +208,7 @@ rust_library(
deps = [
":cache_lookup_scheduler",
":grpc_scheduler",
":property_modifier_scheduler",
":scheduler",
":simple_scheduler",
"//cas/store",
Expand Down
15 changes: 14 additions & 1 deletion cas/scheduler/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use cache_lookup_scheduler::CacheLookupScheduler;
use config::schedulers::SchedulerConfig;
use error::{Error, ResultExt};
use grpc_scheduler::GrpcScheduler;
use property_modifier_scheduler::PropertyModifierScheduler;
use scheduler::{ActionScheduler, WorkerScheduler};
use simple_scheduler::SimpleScheduler;
use store::StoreManager;
Expand All @@ -47,14 +48,26 @@ pub fn scheduler_factory<'a>(
let ac_store = store_manager
.get_store(&config.ac_store)
.err_tip(|| format!("'ac_store': '{}' does not exist", config.ac_store))?;
let (action_scheduler, worker_scheduler) = scheduler_factory(&config.scheduler, store_manager).await?;
let (action_scheduler, worker_scheduler) = scheduler_factory(&config.scheduler, store_manager)
.await
.err_tip(|| "In nested CacheLookupScheduler construction")?;
let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new(
cas_store,
ac_store,
action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
)?);
(Some(cache_lookup_scheduler), worker_scheduler)
}
SchedulerConfig::property_modifier(config) => {
let (action_scheduler, worker_scheduler) = scheduler_factory(&config.scheduler, store_manager)
.await
.err_tip(|| "In nested PropertyModifierScheduler construction")?;
let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new(
config,
action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?,
));
(Some(property_modifier_scheduler), worker_scheduler)
}
};

if let Some(action_scheduler) = &scheduler.0 {
Expand Down
75 changes: 75 additions & 0 deletions cas/scheduler/property_modifier_scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2023 The Turbo Cache Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::watch;

use action_messages::{ActionInfo, ActionInfoHashKey, ActionState};
use config::schedulers::PropertyModification;
use error::{Error, ResultExt};
use platform_property_manager::PlatformPropertyManager;
use scheduler::ActionScheduler;

pub struct PropertyModifierScheduler {
modifications: Vec<config::schedulers::PropertyModification>,
scheduler: Arc<dyn ActionScheduler>,
}

impl PropertyModifierScheduler {
pub fn new(config: &config::schedulers::PropertyModifierScheduler, scheduler: Arc<dyn ActionScheduler>) -> Self {
Self {
modifications: config.modifications.clone(),
scheduler,
}
}
}

#[async_trait]
impl ActionScheduler for PropertyModifierScheduler {
async fn get_platform_property_manager(&self, instance_name: &str) -> Result<Arc<PlatformPropertyManager>, Error> {
self.scheduler.get_platform_property_manager(instance_name).await
}

async fn add_action(&self, mut action_info: ActionInfo) -> Result<watch::Receiver<Arc<ActionState>>, Error> {
let platform_property_manager = self
.get_platform_property_manager(&action_info.unique_qualifier.instance_name)
.await
.err_tip(|| "In PropertyModifierScheduler::add_action")?;
for modification in &self.modifications {
match modification {
PropertyModification::Add(addition) => action_info.platform_properties.properties.insert(
addition.name.clone(),
platform_property_manager
.make_prop_value(&addition.name, &addition.value)
.err_tip(|| "In PropertyModifierScheduler::add_action")?,
),
PropertyModification::Remove(name) => action_info.platform_properties.properties.remove(name),
};
}
self.scheduler.add_action(action_info).await
}

async fn find_existing_action(
&self,
unique_qualifier: &ActionInfoHashKey,
) -> Option<watch::Receiver<Arc<ActionState>>> {
self.scheduler.find_existing_action(unique_qualifier).await
}

async fn clean_recently_completed_actions(&self) {
self.scheduler.clean_recently_completed_actions().await
}
}
Loading

0 comments on commit 656e7f7

Please sign in to comment.