Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set max line length to Rust's defaults #750

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
max_width = 120
reorder_imports = true
imports_granularity = "Module"
group_imports = "StdExternalCrate"
Expand Down
45 changes: 36 additions & 9 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,38 +262,65 @@ pub struct TlsConfig {
pub struct HttpServerConfig {
/// Interval to send keep-alive pings via HTTP2.
/// Note: This is in seconds.
#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub http2_keep_alive_interval: Option<u32>,

#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_max_pending_accept_reset_streams: Option<u32>,

#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_initial_stream_window_size: Option<u32>,

#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_initial_connection_window_size: Option<u32>,

#[serde(default)]
pub experimental_http2_adaptive_window: Option<bool>,

#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_max_frame_size: Option<u32>,

#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_max_concurrent_streams: Option<u32>,

/// Note: This is in seconds.
#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_keep_alive_timeout: Option<u32>,

#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_max_send_buf_size: Option<u32>,

#[serde(default)]
pub experimental_http2_enable_connect_protocol: Option<bool>,

#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_max_header_list_size: Option<u32>,
}

Expand Down
8 changes: 6 additions & 2 deletions nativelink-config/src/serde_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ where
}

/// Same as convert_numeric_with_shellexpand, but supports Option<T>.
pub fn convert_optional_numeric_with_shellexpand<'de, D, T, E>(deserializer: D) -> Result<Option<T>, D::Error>
pub fn convert_optional_numeric_with_shellexpand<'de, D, T, E>(
deserializer: D,
) -> Result<Option<T>, D::Error>
where
D: Deserializer<'de>,
E: fmt::Display,
Expand Down Expand Up @@ -102,7 +104,9 @@ where

/// Helper for serde macro so you can use shellexpand variables in the json configuration
/// files when the number is a numeric type.
pub fn convert_string_with_shellexpand<'de, D: Deserializer<'de>>(deserializer: D) -> Result<String, D::Error> {
pub fn convert_string_with_shellexpand<'de, D: Deserializer<'de>>(
deserializer: D,
) -> Result<String, D::Error> {
let value = String::deserialize(deserializer)?;
Ok((*(shellexpand::env(&value).map_err(de::Error::custom)?)).to_string())
}
Expand Down
3 changes: 2 additions & 1 deletion nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
use serde::{Deserialize, Serialize};

use crate::serde_utils::{
convert_numeric_with_shellexpand, convert_optional_string_with_shellexpand, convert_string_with_shellexpand,
convert_numeric_with_shellexpand, convert_optional_string_with_shellexpand,
convert_string_with_shellexpand,
};

/// Name of the store. This type will be used when referencing a store
Expand Down
14 changes: 11 additions & 3 deletions nativelink-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ impl Error {
if !msg.is_empty() {
msgs.push(msg);
}
Self { code, messages: msgs }
Self {
code,
messages: msgs,
}
}

#[inline]
Expand All @@ -72,7 +75,10 @@ impl Error {
}

#[must_use]
pub fn merge_option<T: Into<Self>, U: Into<Self>>(this: Option<T>, other: Option<U>) -> Option<Self> {
pub fn merge_option<T: Into<Self>, U: Into<Self>>(
this: Option<T>,
other: Option<U>,
) -> Option<Self> {
if let Some(this) = this {
if let Some(other) = other {
return Some(this.into().merge(other));
Expand Down Expand Up @@ -396,7 +402,9 @@ impl From<std::io::ErrorKind> for Code {
| std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted => Self::Unavailable,
std::io::ErrorKind::AlreadyExists => Self::AlreadyExists,
std::io::ErrorKind::InvalidInput | std::io::ErrorKind::InvalidData => Self::InvalidArgument,
std::io::ErrorKind::InvalidInput | std::io::ErrorKind::InvalidData => {
Self::InvalidArgument
}
std::io::ErrorKind::TimedOut => Self::DeadlineExceeded,
std::io::ErrorKind::Interrupted => Self::Aborted,
std::io::ErrorKind::NotConnected
Expand Down
5 changes: 4 additions & 1 deletion nativelink-proto/gen_protos_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ fn main() -> std::io::Result<()> {
.help("Output directory"),
)
.get_matches();
let paths = matches.get_many::<String>("inputs").unwrap().collect::<Vec<&String>>();
let paths = matches
.get_many::<String>("inputs")
.unwrap()
.collect::<Vec<&String>>();
let output_dir = PathBuf::from(matches.get_one::<String>("output_dir").unwrap());

let mut config = Config::new();
Expand Down
10 changes: 8 additions & 2 deletions nativelink-scheduler/src/action_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@ use crate::platform_property_manager::PlatformPropertyManager;
#[async_trait]
pub trait ActionScheduler: Sync + Send + Unpin {
/// Returns the platform property manager.
async fn get_platform_property_manager(&self, instance_name: &str) -> Result<Arc<PlatformPropertyManager>, Error>;
async fn get_platform_property_manager(
&self,
instance_name: &str,
) -> Result<Arc<PlatformPropertyManager>, Error>;

/// Adds an action to the scheduler for remote execution.
async fn add_action(&self, action_info: ActionInfo) -> Result<watch::Receiver<Arc<ActionState>>, Error>;
async fn add_action(
&self,
action_info: ActionInfo,
) -> Result<watch::Receiver<Arc<ActionState>>, Error>;

/// Find an existing action by its name.
async fn find_existing_action(
Expand Down
56 changes: 40 additions & 16 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
};
use nativelink_store::ac_utils::get_and_decode_digest;
use nativelink_store::grpc_store::GrpcStore;
use nativelink_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState};
use nativelink_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState,
};
use nativelink_util::common::DigestInfo;
use nativelink_util::store_trait::Store;
use parking_lot::{Mutex, MutexGuard};
Expand Down Expand Up @@ -83,10 +85,14 @@ async fn get_action_from_store(
}
}

async fn validate_outputs_exist(cas_store: &Arc<dyn Store>, action_result: &ProtoActionResult) -> bool {
async fn validate_outputs_exist(
cas_store: &Arc<dyn Store>,
action_result: &ProtoActionResult,
) -> bool {
// Verify that output_files and output_directories are available in the cas.
let mut required_digests =
Vec::with_capacity(action_result.output_files.len() + action_result.output_directories.len());
let mut required_digests = Vec::with_capacity(
action_result.output_files.len() + action_result.output_directories.len(),
);
for digest in action_result
.output_files
.iter()
Expand All @@ -104,7 +110,10 @@ async fn validate_outputs_exist(cas_store: &Arc<dyn Store>, action_result: &Prot
required_digests.push(digest);
}

let Ok(sizes) = Pin::new(cas_store.as_ref()).has_many(&required_digests).await else {
let Ok(sizes) = Pin::new(cas_store.as_ref())
.has_many(&required_digests)
.await
else {
return false;
};
sizes.into_iter().all(|size| size.is_some())
Expand Down Expand Up @@ -142,11 +151,19 @@ impl CacheLookupScheduler {

#[async_trait]
impl ActionScheduler for CacheLookupScheduler {
async fn get_platform_property_manager(&self, instance_name: &str) -> Result<Arc<PlatformPropertyManager>, Error> {
self.action_scheduler.get_platform_property_manager(instance_name).await
async fn get_platform_property_manager(
&self,
instance_name: &str,
) -> Result<Arc<PlatformPropertyManager>, Error> {
self.action_scheduler
.get_platform_property_manager(instance_name)
.await
}

async fn add_action(&self, action_info: ActionInfo) -> Result<watch::Receiver<Arc<ActionState>>, Error> {
async fn add_action(
&self,
action_info: ActionInfo,
) -> Result<watch::Receiver<Arc<ActionState>>, Error> {
if action_info.skip_cache_lookup {
// Cache lookup skipped, forward to the upstream.
return self.action_scheduler.add_action(action_info).await;
Expand All @@ -160,7 +177,9 @@ impl ActionScheduler for CacheLookupScheduler {
let scope_guard = {
let mut cache_check_actions = self.cache_check_actions.lock();
// Check this isn't a duplicate request first.
if let Some(rx) = subscribe_to_existing_action(&cache_check_actions, &action_info.unique_qualifier) {
if let Some(rx) =
subscribe_to_existing_action(&cache_check_actions, &action_info.unique_qualifier)
{
return Ok(rx);
}
cache_check_actions.insert(action_info.unique_qualifier.clone(), tx.clone());
Expand All @@ -185,11 +204,13 @@ impl ActionScheduler for CacheLookupScheduler {
let action_digest = current_state.action_digest();
let instance_name = action_info.instance_name().clone();
if let Some(action_result) =
get_action_from_store(Pin::new(ac_store.as_ref()), *action_digest, instance_name).await
get_action_from_store(Pin::new(ac_store.as_ref()), *action_digest, instance_name)
.await
{
if validate_outputs_exist(&cas_store, &action_result).await {
// Found in the cache, return the result immediately.
Arc::make_mut(&mut current_state).stage = ActionStage::CompletedFromCache(action_result);
Arc::make_mut(&mut current_state).stage =
ActionStage::CompletedFromCache(action_result);
let _ = tx.send(current_state);
return;
}
Expand All @@ -212,10 +233,11 @@ impl ActionScheduler for CacheLookupScheduler {
}
}
Err(err) => {
Arc::make_mut(&mut current_state).stage = ActionStage::Completed(ActionResult {
error: Some(err),
..Default::default()
});
Arc::make_mut(&mut current_state).stage =
ActionStage::Completed(ActionResult {
error: Some(err),
..Default::default()
});
let _ = tx.send(current_state);
}
}
Expand All @@ -234,7 +256,9 @@ impl ActionScheduler for CacheLookupScheduler {
}
}
// Cache skipped may be in the upstream scheduler.
self.action_scheduler.find_existing_action(unique_qualifier).await
self.action_scheduler
.find_existing_action(unique_qualifier)
.await
}

async fn clean_recently_completed_actions(&self) {}
Expand Down
11 changes: 8 additions & 3 deletions nativelink-scheduler/src/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use crate::property_modifier_scheduler::PropertyModifierScheduler;
use crate::simple_scheduler::SimpleScheduler;
use crate::worker_scheduler::WorkerScheduler;

pub type SchedulerFactoryResults = (Option<Arc<dyn ActionScheduler>>, Option<Arc<dyn WorkerScheduler>>);
pub type SchedulerFactoryResults = (
Option<Arc<dyn ActionScheduler>>,
Option<Arc<dyn WorkerScheduler>>,
);

pub fn scheduler_factory(
scheduler_type_cfg: &SchedulerConfig,
Expand Down Expand Up @@ -95,14 +98,16 @@ fn inner_scheduler_factory(
// (ActionScheduler and WorkerScheduler) and we need to be able to know if the underlying scheduler
// has already been visited, not just the trait. `Any` could be used, but that'd require some rework
// of all the schedulers. This is the most simple way to do it. Rust's uintptr_t is usize.
let action_scheduler_uintptr: usize = Arc::as_ptr(action_scheduler).cast::<()>() as usize;
let action_scheduler_uintptr: usize =
Arc::as_ptr(action_scheduler).cast::<()>() as usize;
if !visited_schedulers.contains(&action_scheduler_uintptr) {
visited_schedulers.insert(action_scheduler_uintptr);
action_scheduler.clone().register_metrics(scheduler_metrics);
}
}
if let Some(worker_scheduler) = &scheduler.1 {
let worker_scheduler_uintptr: usize = Arc::as_ptr(worker_scheduler).cast::<()>() as usize;
let worker_scheduler_uintptr: usize =
Arc::as_ptr(worker_scheduler).cast::<()>() as usize;
if !visited_schedulers.contains(&worker_scheduler_uintptr) {
visited_schedulers.insert(worker_scheduler_uintptr);
worker_scheduler.clone().register_metrics(scheduler_metrics);
Expand Down
Loading
Loading