Skip to content

Commit

Permalink
Support deprecated symlink fields & fix bug for workers use CWD
Browse files Browse the repository at this point in the history
The documentation in the bazel remote execution proto is confusing.
It states that the output_paths will be relative to working directory
which was interpreted as CWD. This PR fixes this miss-understanding
and now files are relative to input_root.

Also, it turns out that bazel does not yet support 2.1, so we
are now populating the deprecated fields as well as the new fields.
  • Loading branch information
allada committed Apr 29, 2022
1 parent 420e7ee commit 00431f9
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 42 deletions.
3 changes: 3 additions & 0 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ build:debug -c dbg
build:self_test --remote_instance_name=main
build:self_test --remote_cache=grpc://127.0.0.1:50051

build:self_execute --remote_executor=grpc://127.0.0.1:50051
build:self_execute --remote_instance_name=main

build --aspects=@rules_rust//rust:defs.bzl%rustfmt_aspect
test --@rules_rust//:rustfmt.toml=//:.rustfmt.toml
# This will make rustfmt only run on `bazel test`.
Expand Down
2 changes: 1 addition & 1 deletion cas/grpc_service/capabilities_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl Capabilities for CapabilitiesServer {
}),
high_api_version: Some(SemVer {
major: 2,
minor: 2,
minor: 0,
patch: 0,
prerelease: "".to_string(),
}),
Expand Down
6 changes: 5 additions & 1 deletion cas/grpc_service/tests/worker_api_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,11 @@ pub mod execution_response_tests {
contents: Default::default(), // We don't implement this.
node_properties: None,
}],
output_file_symlinks: Default::default(), // Bazel deprecated this.
output_file_symlinks: vec![OutputSymlink {
path: "some path3".to_string(),
target: "some target3".to_string(),
node_properties: None,
}],
output_symlinks: vec![OutputSymlink {
path: "some path3".to_string(),
target: "some target3".to_string(),
Expand Down
32 changes: 25 additions & 7 deletions cas/scheduler/action_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ impl TryFrom<ExecutedActionMetadata> for ExecutionMetadata {
pub struct ActionResult {
pub output_files: Vec<FileInfo>,
pub output_folders: Vec<DirectoryInfo>,
pub output_symlinks: Vec<SymlinkInfo>,
pub output_directory_symlinks: Vec<SymlinkInfo>,
pub output_file_symlinks: Vec<SymlinkInfo>,
pub exit_code: i32,
pub stdout_digest: DigestInfo,
pub stderr_digest: DigestInfo,
Expand Down Expand Up @@ -561,19 +562,33 @@ impl Into<ExecuteResponse> for ActionStage {
);
}

let mut output_symlinks = Vec::with_capacity(
action_result.output_file_symlinks.len() + action_result.output_directory_symlinks.len(),
);
output_symlinks.extend_from_slice(action_result.output_file_symlinks.as_slice());
output_symlinks.extend_from_slice(action_result.output_directory_symlinks.as_slice());

ExecuteResponse {
result: Some(ProtoActionResult {
output_files: action_result.output_files.into_iter().map(|v| v.into()).collect(),
output_symlinks: action_result.output_symlinks.into_iter().map(|v| v.into()).collect(),
output_file_symlinks: action_result
.output_file_symlinks
.into_iter()
.map(|v| v.into())
.collect(),
output_symlinks: output_symlinks.into_iter().map(|v| v.into()).collect(),
output_directories: action_result.output_folders.into_iter().map(|v| v.into()).collect(),
output_directory_symlinks: action_result
.output_directory_symlinks
.into_iter()
.map(|v| v.into())
.collect(),
exit_code: action_result.exit_code,
stdout_raw: Default::default(),
stdout_digest: Some(action_result.stdout_digest.into()),
stderr_raw: Default::default(),
stderr_digest: Some(action_result.stderr_digest.into()),
execution_metadata: Some(action_result.execution_metadata.into()),
output_directory_symlinks: Default::default(),
output_file_symlinks: Default::default(),
stdout_raw: Default::default(),
stderr_raw: Default::default(),
}),
cached_result: was_from_cache,
status: Some(error.map_or(Status::default(), |v| v.into())),
Expand All @@ -592,7 +607,10 @@ impl TryFrom<ExecuteResponse> for ActionStage {
.err_tip(|| "Expected result to be set on ExecuteResponse msg")?;
let action_result = ActionResult {
output_files: proto_action_result.output_files.try_map(|v| v.try_into())?,
output_symlinks: proto_action_result.output_symlinks.try_map(|v| v.try_into())?,
output_directory_symlinks: proto_action_result
.output_directory_symlinks
.try_map(|v| v.try_into())?,
output_file_symlinks: proto_action_result.output_file_symlinks.try_map(|v| v.try_into())?,
output_folders: proto_action_result.output_directories.try_map(|v| v.try_into())?,
exit_code: proto_action_result.exit_code,

Expand Down
3 changes: 2 additions & 1 deletion cas/scheduler/tests/action_messages_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ mod action_messages_tests {
let execute_response: ExecuteResponse = ActionStage::Completed(ActionResult {
output_files: vec![],
output_folders: vec![],
output_symlinks: vec![],
output_file_symlinks: vec![],
output_directory_symlinks: vec![],
exit_code: 0,
stdout_digest: DigestInfo::new([2u8; 32], 5),
stderr_digest: DigestInfo::new([3u8; 32], 5),
Expand Down
9 changes: 7 additions & 2 deletions cas/scheduler/tests/scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,10 +477,14 @@ mod scheduler_tests {
path: "123".to_string(),
tree_digest: DigestInfo::new([9u8; 32], 100),
}],
output_symlinks: vec![SymlinkInfo {
output_file_symlinks: vec![SymlinkInfo {
name_or_path: NameOrPath::Name("foo".to_string()),
target: "bar".to_string(),
}],
output_directory_symlinks: vec![SymlinkInfo {
name_or_path: NameOrPath::Name("foo2".to_string()),
target: "bar2".to_string(),
}],
exit_code: 0,
stdout_digest: DigestInfo::new([6u8; 32], 19),
stderr_digest: DigestInfo::new([7u8; 32], 20),
Expand Down Expand Up @@ -554,7 +558,8 @@ mod scheduler_tests {
let action_result = ActionResult {
output_files: Default::default(),
output_folders: Default::default(),
output_symlinks: Default::default(),
output_file_symlinks: Default::default(),
output_directory_symlinks: Default::default(),
exit_code: 0,
stdout_digest: DigestInfo::new([6u8; 32], 19),
stderr_digest: DigestInfo::new([7u8; 32], 20),
Expand Down
72 changes: 50 additions & 22 deletions cas/worker/running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,11 +441,11 @@ impl RunningAction for RunningActionImpl {
let (command, _) = try_join(command_fut, download_to_directory_fut).await?;
command
};
log::info!("\x1b[0;31mWorker Received Command\x1b[0m: {:?}", command);
{
// Create all directories needed for our output paths. This is required by the bazel spec.
let full_work_directory = format!("{}/{}", self.work_directory, command.working_directory);
let prepare_output_directories = move |output_file| {
let full_output_path = format!("{}/{}", full_work_directory, output_file);
let prepare_output_directories = |output_file| {
let full_output_path = format!("{}/{}", self.work_directory, output_file);
async move {
let full_parent_path = Path::new(&full_output_path)
.parent()
Expand Down Expand Up @@ -555,7 +555,7 @@ impl RunningAction for RunningActionImpl {
}

async fn upload_results(self: Arc<Self>) -> Result<Arc<Self>, Error> {
let (command_proto, execution_result) = {
let (mut command_proto, execution_result) = {
let mut state = self.state.lock().await;
(
state
Expand Down Expand Up @@ -594,14 +594,20 @@ impl RunningAction for RunningActionImpl {
None,
File(FileInfo),
Directory(DirectoryInfo),
Symlink(SymlinkInfo),
FileSymlink(SymlinkInfo),
DirectorySymlink(SymlinkInfo),
}
let full_work_directory = format!("{}/{}", self.work_directory, command_proto.working_directory);

let mut output_path_futures = FuturesUnordered::new();
for entry in command_proto.output_paths.into_iter() {
let full_work_directory = &full_work_directory; // This ensures we don't move the value.
let full_path = format!("{}/{}", full_work_directory, entry);
let mut output_paths = command_proto.output_paths;
if output_paths.is_empty() {
output_paths.reserve(command_proto.output_files.len() + command_proto.output_directories.len());
output_paths.append(&mut command_proto.output_files);
output_paths.append(&mut command_proto.output_directories);
}
for entry in output_paths.into_iter() {
let full_path = format!("{}/{}", self.work_directory, entry);
let work_directory = &self.work_directory;
output_path_futures.push(async move {
let metadata = {
let file_handle = match fs::open_file(&full_path).await {
Expand Down Expand Up @@ -634,7 +640,7 @@ impl RunningAction for RunningActionImpl {
};
if metadata.is_dir() {
Ok(OutputType::Directory(
upload_directory(cas_store, full_path, full_work_directory)
upload_directory(cas_store, full_path, work_directory)
.and_then(|(root_dir, children)| async move {
let tree = ProtoTree {
root: Some(root_dir),
Expand All @@ -651,14 +657,31 @@ impl RunningAction for RunningActionImpl {
.await?,
))
} else if metadata.is_symlink() {
Ok(OutputType::Symlink(
upload_symlink(full_path, full_work_directory)
.await
.map(|mut symlink_info| {
symlink_info.name_or_path = NameOrPath::Path(entry);
symlink_info
})?,
))
let output_symlink = upload_symlink(&full_path, work_directory)
.await
.map(|mut symlink_info| {
symlink_info.name_or_path = NameOrPath::Path(entry);
symlink_info
})?;
match fs::metadata(&full_path).await {
Ok(metadata) => {
if metadata.is_dir() {
return Ok(OutputType::DirectorySymlink(output_symlink));
} else {
// Note: If it's anything but directory we put it as a file symlink.
return Ok(OutputType::FileSymlink(output_symlink));
}
}
Err(e) => {
if e.code != Code::NotFound {
return Err(e)
.err_tip(|| format!("While querying target symlink metadata for {}", full_path));
}
// If the file doesn't exist, we consider it a file. Even though the
// file doesn't exist we still need to populate an entry.
return Ok(OutputType::FileSymlink(output_symlink));
}
}
} else {
Err(make_err!(
Code::Internal,
Expand All @@ -670,25 +693,29 @@ impl RunningAction for RunningActionImpl {
}
let mut output_files = vec![];
let mut output_folders = vec![];
let mut output_symlinks = vec![];
let mut output_directory_symlinks = vec![];
let mut output_file_symlinks = vec![];
while let Some(output_type) = output_path_futures.try_next().await? {
match output_type {
OutputType::File(output_file) => output_files.push(output_file),
OutputType::Directory(output_folder) => output_folders.push(output_folder),
OutputType::Symlink(output_symlink) => output_symlinks.push(output_symlink),
OutputType::FileSymlink(output_symlink) => output_file_symlinks.push(output_symlink),
OutputType::DirectorySymlink(output_symlink) => output_directory_symlinks.push(output_symlink),
OutputType::None => { /* Safe to ignore */ }
}
}
drop(output_path_futures);
output_files.sort_unstable_by(|a, b| a.name_or_path.cmp(&b.name_or_path));
output_folders.sort_unstable_by(|a, b| a.path.cmp(&b.path));
output_symlinks.sort_unstable_by(|a, b| a.name_or_path.cmp(&b.name_or_path));
output_file_symlinks.sort_unstable_by(|a, b| a.name_or_path.cmp(&b.name_or_path));
output_directory_symlinks.sort_unstable_by(|a, b| a.name_or_path.cmp(&b.name_or_path));
{
let mut state = self.state.lock().await;
state.action_result = Some(ActionResult {
output_files,
output_folders,
output_symlinks,
output_directory_symlinks,
output_file_symlinks,
exit_code: execution_result.exit_code,
stdout_digest: stdout_digest.into(),
stderr_digest: stderr_digest.into(),
Expand Down Expand Up @@ -820,6 +847,7 @@ impl RunningActionsManager for RunningActionsManagerImpl {
start_execute: StartExecute,
) -> Result<Arc<RunningActionImpl>, Error> {
let action_info = self.create_action_info(start_execute).await?;
log::info!("\x1b[0;31mWorker Received Action\x1b[0m: {:?}", action_info);
let action_id = action_info.unique_qualifier.get_hash();
let work_directory = self.make_work_directory(&action_id).await?;
let running_action = Arc::new(RunningActionImpl::new(
Expand Down
3 changes: 2 additions & 1 deletion cas/worker/tests/local_worker_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ mod local_worker_tests {
let action_result = ActionResult {
output_files: vec![],
output_folders: vec![],
output_symlinks: vec![],
output_file_symlinks: vec![],
output_directory_symlinks: vec![],
exit_code: 5,
stdout_digest: DigestInfo::new([21u8; 32], 10),
stderr_digest: DigestInfo::new([22u8; 32], 10),
Expand Down
31 changes: 24 additions & 7 deletions cas/worker/tests/running_actions_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ mod running_actions_manager_tests {
}

#[tokio::test]
async fn upload_files_test() -> Result<(), Box<dyn std::error::Error>> {
async fn upload_files_from_above_cwd_test() -> Result<(), Box<dyn std::error::Error>> {
let (_, slow_store, cas_store) = setup_stores().await?;
let root_work_directory = make_temp_path("root_work_directory");
fs::create_dir_all(&root_work_directory).await?;
Expand All @@ -341,14 +341,28 @@ mod running_actions_manager_tests {
arguments: vec![
"sh".to_string(),
"-c".to_string(),
"echo -n 123 > test.txt; echo -n foo-stdout; >&2 echo -n bar-stderr".to_string(),
"echo -n 123 > ../test.txt; echo -n foo-stdout; >&2 echo -n bar-stderr".to_string(),
],
output_paths: vec!["test.txt".to_string()],
working_directory: ".".to_string(),
working_directory: "some_cwd".to_string(),
..Default::default()
};
let command_digest = serialize_and_upload_message(&command, cas_store.as_ref()).await?;
let input_root_digest = serialize_and_upload_message(&Directory::default(), cas_store.as_ref()).await?;
let input_root_digest = serialize_and_upload_message(
&Directory {
directories: vec![DirectoryNode {
name: "some_cwd".to_string(),
digest: Some(
serialize_and_upload_message(&Directory::default(), cas_store.as_ref())
.await?
.into(),
),
}],
..Default::default()
},
cas_store.as_ref(),
)
.await?;
let action = Action {
command_digest: Some(command_digest.into()),
input_root_digest: Some(input_root_digest.into()),
Expand Down Expand Up @@ -414,7 +428,8 @@ mod running_actions_manager_tests {
)?,
exit_code: 0,
output_folders: vec![],
output_symlinks: vec![],
output_file_symlinks: vec![],
output_directory_symlinks: vec![],
server_logs: HashMap::new(),
execution_metadata: ExecutionMetadata {
worker: WORKER_ID.to_string(),
Expand Down Expand Up @@ -563,10 +578,11 @@ mod running_actions_manager_tests {
490
)?,
}],
output_symlinks: vec![SymlinkInfo {
output_file_symlinks: vec![SymlinkInfo {
name_or_path: NameOrPath::Path("empty_sym".to_string()),
target: "/dev/null".to_string(),
}],
output_directory_symlinks: vec![],
server_logs: HashMap::new(),
execution_metadata: ExecutionMetadata {
worker: WORKER_ID.to_string(),
Expand Down Expand Up @@ -651,7 +667,8 @@ mod running_actions_manager_tests {
)?,
exit_code: 33,
output_folders: vec![],
output_symlinks: vec![],
output_file_symlinks: vec![],
output_directory_symlinks: vec![],
server_logs: HashMap::new(),
execution_metadata: ExecutionMetadata {
worker: WORKER_ID.to_string(),
Expand Down

0 comments on commit 00431f9

Please sign in to comment.