Skip to content

Commit

Permalink
Fix bug on output_files' folders were not being created
Browse files Browse the repository at this point in the history
`output_paths` were being created, but not `output_files`. This patch
fixes this discrepancy.
  • Loading branch information
allada committed Jun 17, 2022
1 parent 4e51b6d commit bb010f2
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 2 deletions.
15 changes: 13 additions & 2 deletions cas/worker/running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ pub trait RunningAction: Sync + Send + Sized + Unpin + 'static {
/// a consumption of `self`, meaning once a return happens here the lifetime of `Self`
/// is over and any action performed on it after this call is undefined behavior.
async fn get_finished_result(self: Arc<Self>) -> Result<ActionResult, Error>;

/// Returns the work directory of the action.
fn get_work_directory(&self) -> &String;
}

struct RunningActionImplExecutionResult {
Expand Down Expand Up @@ -441,7 +444,6 @@ impl RunningAction for RunningActionImpl {
let (command, _) = try_join(command_fut, download_to_directory_fut).await?;
command
};
log::info!("\x1b[0;31mWorker Received Command\x1b[0m: {:?}", command);
{
// Create all directories needed for our output paths. This is required by the bazel spec.
let prepare_output_directories = |output_file| {
Expand All @@ -456,8 +458,10 @@ impl RunningAction for RunningActionImpl {
Result::<(), Error>::Ok(())
}
};
try_join_all(command.output_files.iter().map(prepare_output_directories)).await?;
try_join_all(command.output_paths.iter().map(prepare_output_directories)).await?;
}
log::info!("\x1b[0;31mWorker Received Command\x1b[0m: {:?}", command);
{
let mut state = self.state.lock().await;
state.command_proto = Some(command);
Expand All @@ -483,6 +487,7 @@ impl RunningAction for RunningActionImpl {
if args.len() < 1 {
return Err(make_input_err!("No arguments provided in Command proto"));
}
log::info!("\x1b[0;31mWorker Executing\x1b[0m: {:?}", &args);
let mut command_builder = process::Command::new(&args[0]);
command_builder
.args(&args[1..])
Expand Down Expand Up @@ -555,6 +560,7 @@ impl RunningAction for RunningActionImpl {
}

async fn upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> {
log::info!("\x1b[0;31mWorker Uploading Results\x1b[0m");
let (mut command_proto, execution_result) = {
let mut state = self.state.lock().await;
(
Expand Down Expand Up @@ -739,6 +745,7 @@ impl RunningAction for RunningActionImpl {
}

async fn cleanup(self: Arc<Self>) -> Result<Arc<Self>, Error> {
log::info!("\x1b[0;31mWorker Cleanup\x1b[0m");
// Note: We need to be careful to keep trying to cleanup even if one of the steps fails.
let remove_dir_result = fs::remove_dir_all(&self.work_directory)
.await
Expand All @@ -757,6 +764,10 @@ impl RunningAction for RunningActionImpl {
.take()
.err_tip(|| "Expected action_result to exist in get_finished_result")
}

fn get_work_directory(&self) -> &String {
&self.work_directory
}
}

#[async_trait]
Expand Down Expand Up @@ -789,7 +800,7 @@ impl RunningActionsManagerImpl {
.clone()
.as_any()
.downcast_ref::<Arc<FilesystemStore>>()
.err_tip(|| "Expected fast slow store for cas_store in RunningActionsManagerImpl")?
.err_tip(|| "Expected FilesystemStore store for .fast_store() in RunningActionsManagerImpl")?
.clone();
Ok(Self {
root_work_directory,
Expand Down
70 changes: 70 additions & 0 deletions cas/worker/tests/running_actions_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,76 @@ mod running_actions_manager_tests {
Ok(())
}

#[tokio::test]
async fn ensure_output_files_full_directories_are_created_test() -> Result<(), Box<dyn std::error::Error>> {
let (_, _, cas_store) = setup_stores().await?;
let root_work_directory = make_temp_path("root_work_directory");
fs::create_dir_all(&root_work_directory).await?;
let running_actions_manager = Arc::new(RunningActionsManagerImpl::new(
root_work_directory,
Pin::into_inner(cas_store.clone()),
)?);
const WORKER_ID: &str = "foo_worker_id";
{
const SALT: u64 = 55;
let command = Command {
arguments: vec!["touch".to_string(), "./some/path/test.txt".to_string()],
output_files: vec!["some/path/test.txt".to_string()],
working_directory: "some_cwd".to_string(),
..Default::default()
};
let command_digest = serialize_and_upload_message(&command, cas_store.as_ref()).await?;
let input_root_digest = serialize_and_upload_message(
&Directory {
directories: vec![DirectoryNode {
name: "some_cwd".to_string(),
digest: Some(
serialize_and_upload_message(&Directory::default(), cas_store.as_ref())
.await?
.into(),
),
}],
..Default::default()
},
cas_store.as_ref(),
)
.await?;
let action = Action {
command_digest: Some(command_digest.into()),
input_root_digest: Some(input_root_digest.into()),
..Default::default()
};
let action_digest = serialize_and_upload_message(&action, cas_store.as_ref()).await?;

let running_action = running_actions_manager
.create_and_add_action(
WORKER_ID.to_string(),
StartExecute {
execute_request: Some(ExecuteRequest {
action_digest: Some(action_digest.into()),
..Default::default()
}),
salt: SALT,
},
)
.await?;

let running_action = running_action.clone().prepare_action().await?;

// The folder should have been created for our output file.
assert_eq!(
fs::metadata(format!("{}/{}", running_action.get_work_directory(), "some/path"))
.await
.is_ok(),
true,
"Expected path to exist"
);

running_action.cleanup().await?;
};
Ok(())
}

#[tokio::test]
async fn upload_files_from_above_cwd_test() -> Result<(), Box<dyn std::error::Error>> {
let (_, slow_store, cas_store) = setup_stores().await?;
Expand Down
4 changes: 4 additions & 0 deletions cas/worker/tests/utils/mock_running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,8 @@ impl RunningAction for MockRunningAction {
),
}
}

fn get_work_directory(&self) -> &String {
unreachable!();
}
}

0 comments on commit bb010f2

Please sign in to comment.