Skip to content

Commit

Permalink
Fix inefficient upload of stderr/stdout in workers
Browse files Browse the repository at this point in the history
There was a serial operation of uploading stderr and stdout then
uploading the files. This PR makes it so they get uploaded in
parallel.

fixes: #212
  • Loading branch information
allada committed Jul 21, 2023
1 parent ea33b6c commit 8ac4824
Showing 1 changed file with 32 additions and 38 deletions.
70 changes: 32 additions & 38 deletions cas/worker/running_actions_manager.rs
Expand Up @@ -640,30 +640,6 @@ impl RunningActionImpl {
)
};
let cas_store = Pin::new(self.running_actions_manager.cas_store.as_ref());
let (stdout_digest, stderr_digest) = {
// Upload our stdout/stderr to our CAS store.
self.metrics()
.clone()
.upload_stderr_and_stdout
.wrap(try_join(
self.metrics().clone().upload_stdout.wrap(async {
let cursor = Cursor::new(execution_result.stdout);
let (digest, mut cursor) = compute_digest(cursor).await?;
cursor.rewind().await.err_tip(|| "Could not rewind cursor")?;
upload_to_store(cas_store, digest, &mut cursor).await?;
Result::<DigestInfo, Error>::Ok(digest)
}),
self.metrics().clone().upload_stderr.wrap(async {
let cursor = Cursor::new(execution_result.stderr);
let (digest, mut cursor) = compute_digest(cursor).await?;
cursor.rewind().await.err_tip(|| "Could not rewind cursor")?;
upload_to_store(cas_store, digest, &mut cursor).await?;
Result::<DigestInfo, Error>::Ok(digest)
}),
))
.await?
};

enum OutputType {
None,
File(FileInfo),
Expand Down Expand Up @@ -768,16 +744,40 @@ impl RunningActionImpl {
let mut output_folders = vec![];
let mut output_directory_symlinks = vec![];
let mut output_file_symlinks = vec![];
while let Some(output_type) = output_path_futures.try_next().await? {
match output_type {
OutputType::File(output_file) => output_files.push(output_file),
OutputType::Directory(output_folder) => output_folders.push(output_folder),
OutputType::FileSymlink(output_symlink) => output_file_symlinks.push(output_symlink),
OutputType::DirectorySymlink(output_symlink) => output_directory_symlinks.push(output_symlink),
OutputType::None => { /* Safe to ignore */ }

let stdout_digest_fut = self.metrics().upload_stdout.wrap(async {
let cursor = Cursor::new(execution_result.stdout);
let (digest, mut cursor) = compute_digest(cursor).await?;
cursor.rewind().await.err_tip(|| "Could not rewind cursor")?;
upload_to_store(cas_store, digest, &mut cursor).await?;
Result::<DigestInfo, Error>::Ok(digest)
});
let stderr_digest_fut = self.metrics().upload_stderr.wrap(async {
let cursor = Cursor::new(execution_result.stderr);
let (digest, mut cursor) = compute_digest(cursor).await?;
cursor.rewind().await.err_tip(|| "Could not rewind cursor")?;
upload_to_store(cas_store, digest, &mut cursor).await?;
Result::<DigestInfo, Error>::Ok(digest)
});

let upload_result = futures::try_join!(stdout_digest_fut, stderr_digest_fut, async {
while let Some(output_type) = output_path_futures.try_next().await? {
match output_type {
OutputType::File(output_file) => output_files.push(output_file),
OutputType::Directory(output_folder) => output_folders.push(output_folder),
OutputType::FileSymlink(output_symlink) => output_file_symlinks.push(output_symlink),
OutputType::DirectorySymlink(output_symlink) => output_directory_symlinks.push(output_symlink),
OutputType::None => { /* Safe to ignore */ }
}
}
}
Ok(())
});
drop(output_path_futures);
let (stdout_digest, stderr_digest) = match upload_result {
Ok((stdout_digest, stderr_digest, _)) => (stdout_digest, stderr_digest),
Err(e) => return Err(e).err_tip(|| "Error while uploading results"),
};

execution_metadata.output_upload_completed_timestamp = (self.running_actions_manager.now_fn)();
output_files.sort_unstable_by(|a, b| a.name_or_path.cmp(&b.name_or_path));
output_folders.sort_unstable_by(|a, b| a.path.cmp(&b.path));
Expand Down Expand Up @@ -1153,7 +1153,6 @@ pub struct Metrics {
child_process: AsyncCounterWrapper,
child_process_success_error_code: CounterWithTime,
child_process_failure_error_code: CounterWithTime,
upload_stderr_and_stdout: AsyncCounterWrapper,
upload_stdout: AsyncCounterWrapper,
upload_stderr: AsyncCounterWrapper,
}
Expand Down Expand Up @@ -1233,11 +1232,6 @@ impl MetricsComponent for Metrics {
&self.child_process_failure_error_code,
"Stats about the child_process_failure_error_code command.",
);
c.publish(
"upload_stderr_and_stdout",
&self.upload_stderr_and_stdout,
"Total time spent processing the upload of stderr and stdout (because they are in parallel).",
);
c.publish(
"upload_stdout",
&self.upload_stdout,
Expand Down

0 comments on commit 8ac4824

Please sign in to comment.