Skip to content

Commit

Permalink
First draft to get remote execution working
Browse files Browse the repository at this point in the history
Implements:
* RunningActionsManager
* RunningAction
* Ability to extract a digest of Directory proto into real files
* Ability to upload the contents of a directory & files/symlinks
* Ability to execute arbitrary commands
* Basic cleanup capability

These new tests also found a bug in filesystem_store where it
was not ensuring the data made it to the filesystem before the
future was resolving. This could result in an immediate read of the
file to return different data because the data was not synced yet.

In theory this PR will allow bazel to execute jobs and should meet
all the requirements, however, this is completely untested in bazel
and currently only unit tested.
  • Loading branch information
allada committed Apr 28, 2022
1 parent 5daa4ff commit f207dfa
Show file tree
Hide file tree
Showing 20 changed files with 1,239 additions and 104 deletions.
29 changes: 20 additions & 9 deletions cas/scheduler/action_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use prost::Message;
use prost_types::Any;
use proto::build::bazel::remote::execution::v2::{
execution_stage, Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata, ExecuteRequest,
ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, OutputDirectory, OutputFile, OutputSymlink,
SymlinkNode,
ExecuteResponse, ExecutedActionMetadata, ExecutionPolicy, FileNode, LogFile, OutputDirectory, OutputFile,
OutputSymlink, Platform, SymlinkNode,
};
use proto::google::longrunning::{operation::Result as LongRunningResult, Operation};

Expand Down Expand Up @@ -112,16 +112,13 @@ impl ActionInfo {
.try_into()?,
timeout: action
.timeout
.err_tip(|| "Expected timeout to exist on Action")?
.unwrap_or(prost_types::Duration::default())
.try_into()
.map_err(|_| make_input_err!("Failed convert proto duration to system duration"))?,
platform_properties: action
.platform
.err_tip(|| "Expected platform to exist on Action")?
.try_into()?,
platform_properties: action.platform.unwrap_or(Platform::default()).try_into()?,
priority: execute_request
.execution_policy
.err_tip(|| "Expected execution_policy to exist on ExecuteRequest")?
.unwrap_or(ExecutionPolicy::default())
.priority,
insert_timestamp: SystemTime::UNIX_EPOCH, // We can't know it at this point.
unique_qualifier: ActionInfoHashKey {
Expand Down Expand Up @@ -223,12 +220,26 @@ impl Eq for ActionInfoHashKey {}
/// This is in order to be able to reuse the same struct instead of building different
/// structs when converting `FileInfo` -> {`OutputFile`, `FileNode`} and other similar
/// structs.
#[derive(Eq, PartialEq, Debug, Clone)]
#[derive(Eq, PartialEq, PartialOrd, Debug, Clone)]
pub enum NameOrPath {
Name(String),
Path(String),
}

impl Ord for NameOrPath {
fn cmp(&self, other: &Self) -> Ordering {
let self_lexical_name = match self {
NameOrPath::Name(name) => name,
NameOrPath::Path(path) => path,
};
let other_lexical_name = match other {
NameOrPath::Name(name) => name,
NameOrPath::Path(path) => path,
};
self_lexical_name.cmp(other_lexical_name)
}
}

/// Represents an individual file and associated metadata.
/// This struct must be 100% compatible with `OutputFile` and `FileNode` structs in
/// remote_execution.proto.
Expand Down
5 changes: 5 additions & 0 deletions cas/store/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ rust_library(
name = "ac_utils",
srcs = ["ac_utils.rs"],
deps = [
"//third_party:bytes",
"//third_party:futures",
"//third_party:prost",
"//third_party:sha2",
"//third_party:tokio",
"//util:buf_channel",
"//util:common",
"//util:error",
":store",
Expand Down
101 changes: 99 additions & 2 deletions cas/store/ac_utils.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
// Copyright 2021 Nathan (Blaise) Bruer. All rights reserved.

use std::default::Default;
use std::io::Cursor;
use std::pin::Pin;

use bytes::BytesMut;
use futures::{future::try_join, Future, FutureExt, TryFutureExt};
use prost::Message;
use sha2::{Digest, Sha256};
use tokio::io::{AsyncRead, AsyncReadExt};

use buf_channel::make_buf_channel_pair;
use common::DigestInfo;
use error::{Code, Error, ResultExt};
use prost::Message;
use store::Store;
use store::{Store, UploadSizeInfo};

// NOTE(blaise.bruer) From some local testing it looks like action cache items are rarely greater than
// 1.2k. Giving a bit more just in case to reduce allocs.
Expand Down Expand Up @@ -42,3 +49,93 @@ pub async fn get_and_decode_digest<T: Message + Default>(

T::decode(store_data).err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))
}

/// Takes a proto message and will serialize it and upload it to the provided store.
pub fn serialize_and_upload_message<'a, T: Message>(
message: &'a T,
cas_store: Pin<&'a dyn Store>,
) -> impl Future<Output = Result<DigestInfo, Error>> + 'a {
async move {
let mut buffer = BytesMut::new();
let digest = {
message
.encode(&mut buffer)
.err_tip(|| "Could not encode directory proto")?;
let mut hasher = Sha256::new();
hasher.update(&buffer);
DigestInfo::new(hasher.finalize().into(), buffer.len() as i64)
};
upload_to_store(cas_store, digest.clone(), &mut Cursor::new(buffer)).await?;
Ok(digest)
}
}

/// Given a bytestream computes the digest for the data.
/// Note: This will happen in a new spawn since computing digests can be thread intensive.
pub fn compute_digest<R: AsyncRead + Unpin + Send + 'static>(
mut reader: R,
) -> impl Future<Output = Result<(DigestInfo, R), Error>> {
tokio::spawn(async move {
const DEFAULT_READ_BUFF_SIZE: usize = 4096;
let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE);
let mut hasher = Sha256::new();
let mut digest_size = 0;
loop {
reader
.read_buf(&mut chunk)
.await
.err_tip(|| "Could not read chunk during compute_digest")?;
if chunk.is_empty() {
break; // EOF.
}
digest_size += chunk.len();
hasher.update(&chunk);
chunk.clear();
}

Ok((DigestInfo::new(hasher.finalize().into(), digest_size as i64), reader))
})
.map(|r| r.err_tip(|| "Failed to launch spawn")?)
}

/// Uploads data to our store for given digest.
/// Sadly we cannot upload our data while computing our hash, this means that we often
/// will need to read the file two times, one to hash the file and the other to upload
/// it. In the future we could possibly upload to store while computing the hash and
/// then "finish" the upload by giving the digest, but not all stores will support this
/// for now we will just always read twice.
pub fn upload_to_store<'a, R: AsyncRead + Unpin>(
cas_store: Pin<&'a dyn Store>,
digest: DigestInfo,
reader: &'a mut R,
) -> impl Future<Output = Result<(), Error>> + 'a {
let (mut tx, rx) = make_buf_channel_pair();
let upload_to_store_fut = cas_store
.update(
digest.clone(),
rx,
UploadSizeInfo::ExactSize(digest.size_bytes as usize),
)
.map(|r| r.err_tip(|| "Could not upload data to store in upload_to_store"));
let read_data_fut = async move {
loop {
const DEFAULT_READ_BUFF_SIZE: usize = 4096;
let mut chunk = BytesMut::with_capacity(DEFAULT_READ_BUFF_SIZE);
reader
.read_buf(&mut chunk)
.await
.err_tip(|| "Could not read chunk during upload_to_store")?;
if chunk.len() == 0 {
break; // EOF.
}
tx.send(chunk.freeze())
.await
.err_tip(|| "Could not send buffer data to store in upload_to_store")?;
}
tx.send_eof()
.await
.err_tip(|| "Could not send EOF to store in upload_to_store")?;
Ok(())
};
try_join(read_data_fut, upload_to_store_fut).map_ok(|(_, _)| ())
}
4 changes: 2 additions & 2 deletions cas/store/compression_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ impl StoreTrait for CompressionStore {
Ok(())
}

fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any> {
Box::new(self)
}
}
4 changes: 2 additions & 2 deletions cas/store/dedup_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ impl StoreTrait for DedupStore {
Ok(())
}

fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any> {
Box::new(self)
}
}
6 changes: 3 additions & 3 deletions cas/store/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl FastSlowStore {
Self { fast_store, slow_store }
}

pub fn fast_slow<'a>(&'a self) -> &'a Arc<dyn StoreTrait> {
pub fn fast_store<'a>(&'a self) -> &'a Arc<dyn StoreTrait> {
&self.fast_store
}

Expand Down Expand Up @@ -211,7 +211,7 @@ impl StoreTrait for FastSlowStore {
Ok(())
}

fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any> {
Box::new(self)
}
}
10 changes: 8 additions & 2 deletions cas/store/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ impl FilesystemStore {
file_size += data_len as u64;
}

temp_file
.as_ref()
.sync_data()
.await
.err_tip(|| format!("Failed to sync_data in filesystem store {}", temp_loc))?;

let entry = Arc::new(FileEntry {
digest: digest.clone(),
file_size,
Expand Down Expand Up @@ -437,7 +443,7 @@ impl StoreTrait for FilesystemStore {
Ok(())
}

fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any> {
Box::new(self)
}
}
4 changes: 2 additions & 2 deletions cas/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl StoreTrait for MemoryStore {
Ok(())
}

fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any> {
Box::new(self)
}
}
4 changes: 2 additions & 2 deletions cas/store/ref_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl StoreTrait for RefStore {
Pin::new(store.as_ref()).get_part(digest, writer, offset, length).await
}

fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any> {
Box::new(self)
}
}
4 changes: 2 additions & 2 deletions cas/store/s3_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ impl StoreTrait for S3Store {
.await
}

fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any> {
Box::new(self)
}
}
4 changes: 2 additions & 2 deletions cas/store/size_partitioning_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl StoreTrait for SizePartitioningStore {
.await
}

fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any> {
Box::new(self)
}
}
3 changes: 2 additions & 1 deletion cas/store/store_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,6 @@ pub trait StoreTrait: Sync + Send + Unpin {
.merge(data_res.err_tip(|| "Failed to read stream to completion in get_part_unchunked"))
}

fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any>;
/// Expect the returned Any to be `Arc<Self>`.
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any>;
}
4 changes: 2 additions & 2 deletions cas/store/verify_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl StoreTrait for VerifyStore {
self.pin_inner().get_part(digest, writer, offset, length).await
}

fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any> {
self
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any> {
Box::new(self)
}
}
13 changes: 12 additions & 1 deletion cas/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ rust_library(
name = "local_worker",
srcs = ["local_worker.rs"],
deps = [
"//cas/scheduler:action_messages",
"//cas/store",
"//cas/store:fast_slow_store",
"//config",
Expand All @@ -28,14 +29,20 @@ rust_library(
srcs = ["running_actions_manager.rs"],
deps = [
"//cas/scheduler:action_messages",
"//cas/store",
"//cas/store:ac_utils",
"//cas/store:fast_slow_store",
"//cas/store:filesystem_store",
"//proto",
"//third_party:bytes",
"//third_party:fast_async_mutex",
"//third_party:filetime",
"//third_party:futures",
"//third_party:hex",
"//third_party:relative_path",
"//third_party:tokio",
"//third_party:filetime",
"//third_party:tokio_stream",
"//third_party:tokio_util",
"//util:common",
"//util:error",
],
Expand Down Expand Up @@ -106,6 +113,7 @@ rust_library(
srcs = ["tests/utils/mock_running_actions_manager.rs"],
testonly = True,
deps = [
"//cas/scheduler:action_messages",
"//proto",
"//third_party:fast_async_mutex",
"//third_party:tokio",
Expand All @@ -119,12 +127,15 @@ rust_test(
name = "running_actions_manager_test",
srcs = ["tests/running_actions_manager_test.rs"],
deps = [
"//cas/scheduler:action_messages",
"//cas/store",
"//cas/store:ac_utils",
"//cas/store:fast_slow_store",
"//cas/store:filesystem_store",
"//cas/store:memory_store",
"//config",
"//proto",
"//third_party:futures",
"//third_party:pretty_assertions",
"//third_party:prost",
"//third_party:rand",
Expand Down

0 comments on commit f207dfa

Please sign in to comment.