Skip to content

Commit

Permalink
Fix bug where worker was not creating working directory properly
Browse files Browse the repository at this point in the history
In some conditions local workers were not creating the work
directory.
  • Loading branch information
allada committed Jun 17, 2022
1 parent c4be423 commit 4e51b6d
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 4 deletions.
1 change: 1 addition & 0 deletions cas/cas_main.rs
Expand Up @@ -105,6 +105,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
)
})?;
let local_worker = new_local_worker(Arc::new(local_worker_cfg), fast_slow_store.clone())
.await
.err_tip(|| "Could not make LocalWorker")?;
tokio::spawn(local_worker.run())
}
Expand Down
5 changes: 5 additions & 0 deletions cas/worker/BUILD
Expand Up @@ -152,15 +152,20 @@ rust_test(
deps = [
"//cas/scheduler:action_messages",
"//cas/scheduler:platform_property_manager",
"//cas/store:fast_slow_store",
"//cas/store:filesystem_store",
"//cas/store:memory_store",
"//config",
"//proto",
"//third_party:env_logger",
"//third_party:pretty_assertions",
"//third_party:prost",
"//third_party:rand",
"//third_party:tokio",
"//third_party:tonic",
"//util:common",
"//util:error",
":local_worker",
":local_worker_test_utils",
":mock_running_actions_manager",
],
Expand Down
9 changes: 7 additions & 2 deletions cas/worker/local_worker.rs
Expand Up @@ -11,7 +11,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{transport::Channel as TonicChannel, Streaming};

use action_messages::{ActionResult, ActionStage};
use common::log;
use common::{fs, log};
use config::cas_server::LocalWorkerConfig;
use error::{make_err, make_input_err, Code, Error, ResultExt};
use fast_slow_store::FastSlowStore;
Expand Down Expand Up @@ -203,7 +203,7 @@ pub struct LocalWorker<T: WorkerApiClientTrait, U: RunningActionsManager> {

/// Creates a new LocalWorker. The `cas_store` must be an instance of FastSlowStore and will be
/// checked at runtime.
pub fn new_local_worker(
pub async fn new_local_worker(
config: Arc<LocalWorkerConfig>,
cas_store: Arc<dyn Store>,
) -> Result<LocalWorker<WorkerApiClientWrapper, RunningActionsManagerImpl>, Error> {
Expand All @@ -212,6 +212,11 @@ pub fn new_local_worker(
.downcast_ref::<Arc<FastSlowStore>>()
.err_tip(|| "Expected store for LocalWorker's store to be a FastSlowStore")?
.clone();

fs::create_dir_all(&config.work_directory)
.await
.err_tip(|| format!("Could not make work_directory : {}", config.work_directory))?;

let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(
config.work_directory.clone(),
fast_slow_store,
Expand Down
57 changes: 55 additions & 2 deletions cas/worker/tests/local_worker_test.rs
@@ -1,17 +1,23 @@
// Copyright 2022 Nathan (Blaise) Bruer. All rights reserved.

use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use prost::Message;
use rand::{thread_rng, Rng};
use tonic::Response;

use action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ExecutionMetadata};
use common::{encode_stream_proto, DigestInfo};
use config::cas_server::WrokerProperty;
use common::{encode_stream_proto, fs, DigestInfo};
use config::cas_server::{LocalWorkerConfig, WrokerProperty};
use error::{make_input_err, Error};
use fast_slow_store::FastSlowStore;
use filesystem_store::FilesystemStore;
use local_worker::new_local_worker;
use local_worker_test_utils::{setup_grpc_stream, setup_local_worker};
use memory_store::MemoryStore;
use mock_running_actions_manager::MockRunningAction;
use platform_property_manager::PlatformProperties;
use proto::build::bazel::remote::execution::v2::platform::Property;
Expand All @@ -20,6 +26,17 @@ use proto::com::github::allada::turbo_cache::remote_execution::{
SupportedProperties, UpdateForWorker,
};

/// Get temporary path from either `TEST_TMPDIR` or best effort temp directory if
/// not set.
fn make_temp_path(data: &str) -> String {
format!(
"{}/{}/{}",
env::var("TEST_TMPDIR").unwrap_or(env::temp_dir().to_str().unwrap().to_string()),
thread_rng().gen::<u64>(),
data
)
}

#[cfg(test)]
mod local_worker_tests {
use super::*;
Expand Down Expand Up @@ -215,4 +232,40 @@ mod local_worker_tests {

Ok(())
}

#[tokio::test]
async fn new_local_worker_creates_work_directory_test() -> Result<(), Box<dyn std::error::Error>> {
let cas_store = Arc::new(FastSlowStore::new(
&config::backends::FastSlowStore {
// Note: These are not needed for this test, so we put dummy memory stores here.
fast: config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
slow: config::backends::StoreConfig::memory(config::backends::MemoryStore::default()),
},
Arc::new(
FilesystemStore::new(&config::backends::FilesystemStore {
content_path: make_temp_path("content_path"),
temp_path: make_temp_path("temp_path"),
..Default::default()
})
.await?,
),
Arc::new(MemoryStore::new(&config::backends::MemoryStore::default())),
));
let work_directory = make_temp_path("foo");
new_local_worker(
Arc::new(LocalWorkerConfig {
work_directory: work_directory.clone(),
..Default::default()
}),
cas_store,
)
.await?;

assert!(
fs::metadata(work_directory).await.is_ok(),
"Expected work_directory to be created"
);

Ok(())
}
}

0 comments on commit 4e51b6d

Please sign in to comment.