Skip to content

Commit

Permalink
Adds initial implementation for LocalWorker and supporting classes
Browse files Browse the repository at this point in the history
This is the initial draft of LocalWorker. It does not yet execute
code, but is a step closer to being able to do so.

LocalWorker should be able to:
* Register worker and notify server of properties
* Handle disconnects and attempt to reconnect to server
* Receive jobs from server and push them to RunningActionsManager
* Run keep alives
* Respond to server with result of what RunningActionManager posts
  • Loading branch information
allada committed Apr 21, 2022
1 parent baad561 commit 90cff23
Show file tree
Hide file tree
Showing 19 changed files with 1,166 additions and 43 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Expand Up @@ -53,6 +53,8 @@ rustfmt-nightly = { git = "https://github.com/rust-lang/rustfmt", tag = "v1.4.3
maplit = "1.0.2"
mock_instant = "0.2.1"
rusoto_mock = "=0.46.0"
hyper = "0.14.18"
ctor = "0.1.22"

[patch.crates-io]
rusoto_mock = { git = "https://github.com/allada/rusoto.git", rev = "cc9acca00dbafa41a37d75faeaf2a4baba33d42e" }
Expand Down
1 change: 1 addition & 0 deletions cas/BUILD
Expand Up @@ -16,6 +16,7 @@ rust_binary(
"//cas/scheduler",
"//cas/store",
"//cas/store:default_store_factory",
"//cas/worker:local_worker",
"//config",
"//third_party:clap",
"//third_party:env_logger",
Expand Down
18 changes: 17 additions & 1 deletion cas/cas_main.rs
Expand Up @@ -13,10 +13,11 @@ use ac_server::AcServer;
use bytestream_server::ByteStreamServer;
use capabilities_server::CapabilitiesServer;
use cas_server::CasServer;
use config::cas_server::CasConfig;
use config::cas_server::{CasConfig, WorkerConfig};
use default_store_factory::store_factory;
use error::{make_err, Code, Error, ResultExt};
use execution_server::ExecutionServer;
use local_worker::new_local_worker;
use scheduler::Scheduler;
use store::StoreManager;
use worker_api_server::WorkerApiServer;
Expand Down Expand Up @@ -76,6 +77,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

let mut futures: Vec<BoxFuture<Result<(), Error>>> = Vec::new();
for worker_cfg in cfg.workers.unwrap_or(vec![]) {
let spawn_fut = match worker_cfg {
WorkerConfig::local(local_worker_cfg) => {
let cas_store = store_manager.get_store(&local_worker_cfg.cas_store).err_tip(|| {
format!(
"Failed to find store for cas_store_ref in worker config : {}",
local_worker_cfg.cas_store
)
})?;
tokio::spawn(new_local_worker(Arc::new(local_worker_cfg), cas_store.clone()).run())
}
};
futures.push(Box::pin(spawn_fut.map_ok_or_else(|e| Err(e.into()), |v| v)));
}

for server_cfg in cfg.servers {
let mut server = Server::builder();
let services = server_cfg.services.ok_or_else(|| "'services' must be configured")?;
Expand Down
1 change: 0 additions & 1 deletion cas/grpc_service/BUILD
Expand Up @@ -181,7 +181,6 @@ rust_test(
"//third_party:futures",
"//third_party:maplit",
"//third_party:pretty_assertions",
"//third_party:prost",
"//third_party:tokio",
"//third_party:tokio_stream",
"//third_party:tonic",
Expand Down
41 changes: 6 additions & 35 deletions cas/grpc_service/tests/bytestream_server_test.rs
Expand Up @@ -7,11 +7,10 @@ use std::sync::Arc;
use bytestream_server::ByteStreamServer;
use futures::{pin_mut, poll, task::Poll};
use maplit::hashmap;
use prost::{bytes::Bytes, Message};
use tokio::task::yield_now;
use tonic::Request;

use common::DigestInfo;
use common::{encode_stream_proto, DigestInfo};
use config;
use default_store_factory::store_factory;
use error::{make_err, Code, Error, ResultExt};
Expand Down Expand Up @@ -45,34 +44,6 @@ fn make_bytestream_server(store_manager: &StoreManager) -> Result<ByteStreamServ
)
}

// Utility to encode our proto into GRPC stream format.
fn encode<T: Message>(proto: &T) -> Result<Bytes, Box<dyn std::error::Error>> {
use prost::bytes::{BufMut, BytesMut};
let mut buf = BytesMut::new();
// See below comment on spec.
use std::mem::size_of;
const PREFIX_BYTES: usize = size_of::<u8>() + size_of::<u32>();
for _ in 0..PREFIX_BYTES {
// Advance our buffer first.
// We will backfill it once we know the size of the message.
buf.put_u8(0);
}
proto.encode(&mut buf)?;
let len = buf.len() - PREFIX_BYTES;
{
let mut buf = &mut buf[0..PREFIX_BYTES];
// See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#:~:text=Compressed-Flag
// for more details on spec.
// Compressed-Flag -> 0 / 1 # encoded as 1 byte unsigned integer.
buf.put_u8(0);
// Message-Length -> {length of Message} # encoded as 4 byte unsigned integer (big endian).
buf.put_u32(len as u32);
// Message -> *{binary octet}.
}

Ok(buf.freeze())
}

#[cfg(test)]
pub mod write_tests {
use super::*;
Expand Down Expand Up @@ -134,18 +105,18 @@ pub mod write_tests {
// Write first chunk of data.
write_request.write_offset = 0;
write_request.data = raw_data[..BYTE_SPLIT_OFFSET].into();
tx.send_data(encode(&write_request)?).await?;
tx.send_data(encode_stream_proto(&write_request)?).await?;

// Write empty set of data (clients are allowed to do this.
write_request.write_offset = BYTE_SPLIT_OFFSET as i64;
write_request.data = vec![].into();
tx.send_data(encode(&write_request)?).await?;
tx.send_data(encode_stream_proto(&write_request)?).await?;

// Write final bit of data.
write_request.write_offset = BYTE_SPLIT_OFFSET as i64;
write_request.data = raw_data[BYTE_SPLIT_OFFSET..].into();
write_request.finish_write = true;
tx.send_data(encode(&write_request)?).await?;
tx.send_data(encode_stream_proto(&write_request)?).await?;

raw_data
};
Expand Down Expand Up @@ -387,7 +358,7 @@ pub mod query_tests {
// Write first chunk of data.
write_request.write_offset = 0;
write_request.data = raw_data[..BYTE_SPLIT_OFFSET].into();
tx.send_data(encode(&write_request)?).await?;
tx.send_data(encode_stream_proto(&write_request)?).await?;

{
// Check to see if our request is active.
Expand All @@ -410,7 +381,7 @@ pub mod query_tests {
write_request.write_offset = BYTE_SPLIT_OFFSET as i64;
write_request.data = raw_data[BYTE_SPLIT_OFFSET..].into();
write_request.finish_write = true;
tx.send_data(encode(&write_request)?).await?;
tx.send_data(encode_stream_proto(&write_request)?).await?;

{
// Now that it's done uploading, ensure it returns a success when requested again.
Expand Down
1 change: 1 addition & 0 deletions cas/scheduler/BUILD
Expand Up @@ -52,6 +52,7 @@ rust_library(
"//proto",
"//third_party:prost",
"//third_party:prost_types",
"//third_party:sha2",
"//third_party:tonic",
"//util:common",
"//util:error",
Expand Down
65 changes: 61 additions & 4 deletions cas/scheduler/action_messages.rs
Expand Up @@ -7,14 +7,17 @@ use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use sha2::{Digest as _, Sha256};

use common::{DigestInfo, HashMapExt, VecExt};
use error::{Error, ResultExt};
use error::{make_input_err, Error, ResultExt};
use platform_property_manager::PlatformProperties;
use prost::Message;
use prost_types::Any;
use proto::build::bazel::remote::execution::v2::{
execution_stage, ActionResult as ProtoActionResult, ExecuteOperationMetadata, ExecuteRequest, ExecuteResponse,
ExecutedActionMetadata, FileNode, LogFile, NodeProperties, OutputDirectory, OutputFile, OutputSymlink, SymlinkNode,
execution_stage, Action, ActionResult as ProtoActionResult, ExecuteOperationMetadata, ExecuteRequest,
ExecuteResponse, ExecutedActionMetadata, FileNode, LogFile, NodeProperties, OutputDirectory, OutputFile,
OutputSymlink, SymlinkNode,
};
use proto::google::longrunning::{operation::Result as LongRunningResult, Operation};

Expand All @@ -33,10 +36,23 @@ pub struct ActionInfoHashKey {
pub salt: u64,
}

impl ActionInfoHashKey {
/// Utility function used to make a unique hash of the digest including the salt.
pub fn get_hash(&self) -> [u8; 32] {
Sha256::new()
.chain(&self.digest.packed_hash)
.chain(&self.digest.size_bytes.to_le_bytes())
.chain(&self.salt.to_le_bytes())
.finalize()
.into()
}
}

/// Information needed to execute an action. This struct is used over bazel's proto `Action`
/// for simplicity and offers a `salt`, which is useful to ensure during hashing (for dicts)
/// to ensure we never match against another `ActionInfo` (when a task should never be cached).
/// This struct must be 100% compatible with `ExecuteRequest` struct in remote_execution.proto.
/// This struct must be 100% compatible with `ExecuteRequest` struct in remote_execution.proto
/// except for the salt field.
#[derive(Clone, Debug)]
pub struct ActionInfo {
/// Instance name used to send the request.
Expand Down Expand Up @@ -78,6 +94,45 @@ impl ActionInfo {
pub fn salt(&self) -> &u64 {
&self.unique_qualifier.salt
}

pub fn try_from_action_and_execute_request_with_salt(
execute_request: ExecuteRequest,
action: Action,
salt: u64,
) -> Result<Self, Error> {
Ok(Self {
instance_name: execute_request.instance_name,
command_digest: action
.command_digest
.err_tip(|| "Expected command_digest to exist on Action")?
.try_into()?,
input_root_digest: action
.input_root_digest
.err_tip(|| "Expected input_root_digest to exist on Action")?
.try_into()?,
timeout: action
.timeout
.err_tip(|| "Expected timeout to exist on Action")?
.try_into()
.map_err(|_| make_input_err!("Failed convert proto duration to system duration"))?,
platform_properties: action
.platform
.err_tip(|| "Expected platform to exist on Action")?
.try_into()?,
priority: execute_request
.execution_policy
.err_tip(|| "Expected execution_policy to exist on ExecuteRequest")?
.priority,
insert_timestamp: SystemTime::UNIX_EPOCH, // We can't know it at this point.
unique_qualifier: ActionInfoHashKey {
digest: execute_request
.action_digest
.err_tip(|| "Expected action_digest to exist on ExecuteRequest")?
.try_into()?,
salt,
},
})
}
}

impl Into<ExecuteRequest> for &ActionInfo {
Expand Down Expand Up @@ -478,6 +533,8 @@ pub enum ActionStage {
CacheCheck,
/// Action has been accepted and waiting for worker to take it.
Queued,
// TODO(allada) We need a way to know if the job was sent to a worker, but hasn't begun
// execution yet.
/// Worker is executing the action.
Executing,
/// Worker completed the work with result.
Expand Down
104 changes: 104 additions & 0 deletions cas/worker/BUILD
@@ -0,0 +1,104 @@
# Copyright 2022 Nathan (Blaise) Bruer. All rights reserved.

load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test")

rust_library(
name = "local_worker",
srcs = ["local_worker.rs"],
deps = [
"//cas/store",
"//config",
"//proto",
"//third_party:futures",
"//third_party:tokio",
"//third_party:tokio_stream",
"//third_party:tonic",
"//util:common",
"//util:error",
":running_actions_manager",
":worker_api_client_wrapper",
":worker_utils",
],
visibility = ["//cas:__pkg__"],
)

rust_library(
name = "running_actions_manager",
srcs = ["running_actions_manager.rs"],
deps = [
"//cas/scheduler:action_messages",
"//cas/store",
"//cas/store:ac_utils",
"//proto",
"//third_party:fast_async_mutex",
"//util:common",
"//util:error",
],
proc_macro_deps = ["//third_party:async_trait"],
)


rust_library(
name = "worker_utils",
srcs = ["worker_utils.rs"],
deps = [
"//config",
"//proto",
"//third_party:futures",
"//third_party:shlex",
"//third_party:tokio",
"//third_party:tonic",
"//util:common",
"//util:error",
],
)

rust_library(
name = "worker_api_client_wrapper",
srcs = ["worker_api_client_wrapper.rs"],
deps = [
"//proto",
"//third_party:tonic",
],
proc_macro_deps = ["//third_party:async_trait"],
)

rust_library(
name = "local_worker_test_utils",
srcs = ["local_worker_test_utils.rs"],
testonly = True,
deps = [
"//config",
"//proto",
"//third_party:fast_async_mutex",
"//third_party:hyper",
"//third_party:tokio",
"//third_party:tonic",
"//util:common",
"//util:error",
":local_worker",
":running_actions_manager",
":worker_api_client_wrapper",
],
proc_macro_deps = ["//third_party:async_trait"],
)

rust_test(
name = "local_worker_test",
srcs = ["tests/local_worker_test.rs"],
deps = [
"//cas/scheduler:action_messages",
"//cas/scheduler:platform_property_manager",
"//config",
"//proto",
"//third_party:env_logger",
"//third_party:pretty_assertions",
"//third_party:prost",
"//third_party:tokio",
"//third_party:tonic",
"//util:common",
"//util:error",
":local_worker_test_utils",
],
proc_macro_deps = ["//third_party:async_trait", "//third_party:ctor"],
)

0 comments on commit 90cff23

Please sign in to comment.