Skip to content

Commit

Permalink
Minor optimizations and documentation to CacheLookupScheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Sep 13, 2023
1 parent 06c03de commit 66c403d
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cas/scheduler/BUILD
Expand Up @@ -111,6 +111,7 @@ rust_library(
"//proto",
"//util:common",
"//util:error",
"@crate_index//:drop_guard",
"@crate_index//:futures",
"@crate_index//:parking_lot",
"@crate_index//:tonic",
Expand Down
39 changes: 24 additions & 15 deletions cas/scheduler/cache_lookup_scheduler.rs
Expand Up @@ -17,6 +17,7 @@ use std::pin::Pin;
use std::sync::Arc;

use async_trait::async_trait;
use drop_guard::guard;
use futures::stream::StreamExt;
use tokio::select;
use tokio::sync::watch;
Expand Down Expand Up @@ -55,7 +56,7 @@ pub struct CacheLookupScheduler {
}

async fn get_action_from_store(
ac_store: Arc<dyn Store>,
ac_store: &Arc<dyn Store>,
action_digest: &DigestInfo,
instance_name: String,
) -> Option<ProtoActionResult> {
Expand Down Expand Up @@ -83,19 +84,19 @@ async fn get_action_from_store(
}
}

async fn validate_outputs_exist(cas_store: Arc<dyn Store>, action_result: &ProtoActionResult) -> bool {
async fn validate_outputs_exist(cas_store: &Arc<dyn Store>, action_result: &ProtoActionResult) -> bool {
// Verify that output_files and output_directories are available in the cas.
let mut required_digests =
Vec::with_capacity(action_result.output_files.len() + action_result.output_directories.len());
for digest in action_result
.output_files
.iter()
.filter_map(|output_file| output_file.digest.clone())
.filter_map(|output_file| output_file.digest.as_ref())
.chain(
action_result
.output_directories
.iter()
.filter_map(|output_directory| output_directory.tree_digest.clone()),
.filter_map(|output_file| output_file.tree_digest.as_ref()),
)
{
let Ok(digest) = DigestInfo::try_from(digest) else {
Expand Down Expand Up @@ -157,29 +158,38 @@ impl ActionScheduler for CacheLookupScheduler {
});
let (tx, rx) = watch::channel(current_state.clone());
let tx = Arc::new(tx);
{
let drop_guard = {
let mut cache_check_actions = self.cache_check_actions.lock();
// Check this isn't a duplicate request first.
if let Some(rx) = subscribe_to_existing_action(&cache_check_actions, &action_info.unique_qualifier) {
return Ok(rx);
}
cache_check_actions.insert(action_info.unique_qualifier.clone(), tx.clone());
}
// In the event we loose the reference to our `drop_guard`, it will remove
// the action from the cache_check_actions map.
let cache_check_actions = self.cache_check_actions.clone();
let unique_qualifier = action_info.unique_qualifier.clone();
guard((), move |_| {
cache_check_actions.lock().remove(&unique_qualifier);
})
};

let ac_store = self.ac_store.clone();
let cas_store = self.cas_store.clone();
let action_scheduler = self.action_scheduler.clone();
let cache_check_actions = self.cache_check_actions.clone();
// We need this spawn because we are returning a stream and this spawn will populate the stream's data.
tokio::spawn(async move {
// If our spawn ever dies, we will remove the action from the cache_check_actions map.
let _drop_guard = drop_guard;

// Perform cache check.
let action_digest = current_state.action_digest();
let instance_name = action_info.instance_name().clone();
let unique_qualifier = action_info.unique_qualifier.clone();
if let Some(proto_action_result) =
get_action_from_store(ac_store, current_state.action_digest(), instance_name.clone()).await
{
if validate_outputs_exist(cas_store, &proto_action_result).await {
if let Some(action_result) = get_action_from_store(&ac_store, action_digest, instance_name).await {
if validate_outputs_exist(&cas_store, &action_result).await {
// Found in the cache, return the result immediately.
Arc::make_mut(&mut current_state).stage = ActionStage::CompletedFromCache(proto_action_result);
Arc::make_mut(&mut current_state).stage = ActionStage::CompletedFromCache(action_result);
let _ = tx.send(current_state);
cache_check_actions.lock().remove(&unique_qualifier);
return;
}
}
Expand Down Expand Up @@ -208,7 +218,6 @@ impl ActionScheduler for CacheLookupScheduler {
let _ = tx.send(current_state);
}
}
cache_check_actions.lock().remove(&unique_qualifier);
});
Ok(rx)
}
Expand Down
1 change: 1 addition & 0 deletions gencargo/cache_lookup_scheduler/Cargo.toml
Expand Up @@ -20,6 +20,7 @@ doctest = false

[dependencies]
async-trait = { workspace = true }
drop_guard = { workspace = true }
futures = { workspace = true }
parking_lot = { workspace = true }
tokio = { workspace = true }
Expand Down
13 changes: 13 additions & 0 deletions util/common.rs
Expand Up @@ -120,6 +120,19 @@ impl TryFrom<Digest> for DigestInfo {
}
}

impl TryFrom<&Digest> for DigestInfo {
type Error = Error;

fn try_from(digest: &Digest) -> Result<Self, Self::Error> {
let packed_hash =
<[u8; 32]>::from_hex(&digest.hash).err_tip(|| format!("Invalid sha256 hash: {}", digest.hash))?;
Ok(DigestInfo {
size_bytes: digest.size_bytes,
packed_hash,
})
}
}

impl From<DigestInfo> for Digest {
fn from(val: DigestInfo) -> Self {
Digest {
Expand Down

0 comments on commit 66c403d

Please sign in to comment.