Skip to content

Commit

Permalink
Set max line length to Rust's defaults
Browse files Browse the repository at this point in the history
... which is 100 馃珷
  • Loading branch information
aaronmondal committed Mar 11, 2024
1 parent cea2336 commit 0d9e7b1
Show file tree
Hide file tree
Showing 93 changed files with 4,764 additions and 1,825 deletions.
1 change: 0 additions & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
max_width = 120
reorder_imports = true
imports_granularity = "Module"
group_imports = "StdExternalCrate"
Expand Down
45 changes: 36 additions & 9 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,38 +262,65 @@ pub struct TlsConfig {
pub struct HttpServerConfig {
/// Interval to send keep-alive pings via HTTP2.
/// Note: This is in seconds.
#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub http2_keep_alive_interval: Option<u32>,

#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_max_pending_accept_reset_streams: Option<u32>,

#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_initial_stream_window_size: Option<u32>,

#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_initial_connection_window_size: Option<u32>,

#[serde(default)]
pub experimental_http2_adaptive_window: Option<bool>,

#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_max_frame_size: Option<u32>,

#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_max_concurrent_streams: Option<u32>,

/// Note: This is in seconds.
#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_keep_alive_timeout: Option<u32>,

#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_max_send_buf_size: Option<u32>,

#[serde(default)]
pub experimental_http2_enable_connect_protocol: Option<bool>,

#[serde(default, deserialize_with = "convert_optional_numeric_with_shellexpand")]
#[serde(
default,
deserialize_with = "convert_optional_numeric_with_shellexpand"
)]
pub experimental_http2_max_header_list_size: Option<u32>,
}

Expand Down
8 changes: 6 additions & 2 deletions nativelink-config/src/serde_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ where
}

/// Same as convert_numeric_with_shellexpand, but supports Option<T>.
pub fn convert_optional_numeric_with_shellexpand<'de, D, T, E>(deserializer: D) -> Result<Option<T>, D::Error>
pub fn convert_optional_numeric_with_shellexpand<'de, D, T, E>(
deserializer: D,
) -> Result<Option<T>, D::Error>
where
D: Deserializer<'de>,
E: fmt::Display,
Expand Down Expand Up @@ -102,7 +104,9 @@ where

/// Helper for serde macro so you can use shellexpand variables in the json configuration
/// files when the number is a numeric type.
pub fn convert_string_with_shellexpand<'de, D: Deserializer<'de>>(deserializer: D) -> Result<String, D::Error> {
pub fn convert_string_with_shellexpand<'de, D: Deserializer<'de>>(
deserializer: D,
) -> Result<String, D::Error> {
let value = String::deserialize(deserializer)?;
Ok((*(shellexpand::env(&value).map_err(de::Error::custom)?)).to_string())
}
Expand Down
3 changes: 2 additions & 1 deletion nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
use serde::{Deserialize, Serialize};

use crate::serde_utils::{
convert_numeric_with_shellexpand, convert_optional_string_with_shellexpand, convert_string_with_shellexpand,
convert_numeric_with_shellexpand, convert_optional_string_with_shellexpand,
convert_string_with_shellexpand,
};

/// Name of the store. This type will be used when referencing a store
Expand Down
14 changes: 11 additions & 3 deletions nativelink-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ impl Error {
if !msg.is_empty() {
msgs.push(msg);
}
Self { code, messages: msgs }
Self {
code,
messages: msgs,
}
}

#[inline]
Expand All @@ -72,7 +75,10 @@ impl Error {
}

#[must_use]
pub fn merge_option<T: Into<Self>, U: Into<Self>>(this: Option<T>, other: Option<U>) -> Option<Self> {
pub fn merge_option<T: Into<Self>, U: Into<Self>>(
this: Option<T>,
other: Option<U>,
) -> Option<Self> {
if let Some(this) = this {
if let Some(other) = other {
return Some(this.into().merge(other));
Expand Down Expand Up @@ -396,7 +402,9 @@ impl From<std::io::ErrorKind> for Code {
| std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted => Self::Unavailable,
std::io::ErrorKind::AlreadyExists => Self::AlreadyExists,
std::io::ErrorKind::InvalidInput | std::io::ErrorKind::InvalidData => Self::InvalidArgument,
std::io::ErrorKind::InvalidInput | std::io::ErrorKind::InvalidData => {
Self::InvalidArgument
}
std::io::ErrorKind::TimedOut => Self::DeadlineExceeded,
std::io::ErrorKind::Interrupted => Self::Aborted,
std::io::ErrorKind::NotConnected
Expand Down
5 changes: 4 additions & 1 deletion nativelink-proto/gen_protos_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ fn main() -> std::io::Result<()> {
.help("Output directory"),
)
.get_matches();
let paths = matches.get_many::<String>("inputs").unwrap().collect::<Vec<&String>>();
let paths = matches
.get_many::<String>("inputs")
.unwrap()
.collect::<Vec<&String>>();
let output_dir = PathBuf::from(matches.get_one::<String>("output_dir").unwrap());

let mut config = Config::new();
Expand Down
10 changes: 8 additions & 2 deletions nativelink-scheduler/src/action_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@ use crate::platform_property_manager::PlatformPropertyManager;
#[async_trait]
pub trait ActionScheduler: Sync + Send + Unpin {
/// Returns the platform property manager.
async fn get_platform_property_manager(&self, instance_name: &str) -> Result<Arc<PlatformPropertyManager>, Error>;
async fn get_platform_property_manager(
&self,
instance_name: &str,
) -> Result<Arc<PlatformPropertyManager>, Error>;

/// Adds an action to the scheduler for remote execution.
async fn add_action(&self, action_info: ActionInfo) -> Result<watch::Receiver<Arc<ActionState>>, Error>;
async fn add_action(
&self,
action_info: ActionInfo,
) -> Result<watch::Receiver<Arc<ActionState>>, Error>;

/// Find an existing action by its name.
async fn find_existing_action(
Expand Down
56 changes: 40 additions & 16 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
};
use nativelink_store::ac_utils::get_and_decode_digest;
use nativelink_store::grpc_store::GrpcStore;
use nativelink_util::action_messages::{ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState};
use nativelink_util::action_messages::{
ActionInfo, ActionInfoHashKey, ActionResult, ActionStage, ActionState,
};
use nativelink_util::common::DigestInfo;
use nativelink_util::store_trait::Store;
use parking_lot::{Mutex, MutexGuard};
Expand Down Expand Up @@ -83,10 +85,14 @@ async fn get_action_from_store(
}
}

async fn validate_outputs_exist(cas_store: &Arc<dyn Store>, action_result: &ProtoActionResult) -> bool {
async fn validate_outputs_exist(
cas_store: &Arc<dyn Store>,
action_result: &ProtoActionResult,
) -> bool {
// Verify that output_files and output_directories are available in the cas.
let mut required_digests =
Vec::with_capacity(action_result.output_files.len() + action_result.output_directories.len());
let mut required_digests = Vec::with_capacity(
action_result.output_files.len() + action_result.output_directories.len(),
);
for digest in action_result
.output_files
.iter()
Expand All @@ -104,7 +110,10 @@ async fn validate_outputs_exist(cas_store: &Arc<dyn Store>, action_result: &Prot
required_digests.push(digest);
}

let Ok(sizes) = Pin::new(cas_store.as_ref()).has_many(&required_digests).await else {
let Ok(sizes) = Pin::new(cas_store.as_ref())
.has_many(&required_digests)
.await
else {
return false;
};
sizes.into_iter().all(|size| size.is_some())
Expand Down Expand Up @@ -142,11 +151,19 @@ impl CacheLookupScheduler {

#[async_trait]
impl ActionScheduler for CacheLookupScheduler {
async fn get_platform_property_manager(&self, instance_name: &str) -> Result<Arc<PlatformPropertyManager>, Error> {
self.action_scheduler.get_platform_property_manager(instance_name).await
async fn get_platform_property_manager(
&self,
instance_name: &str,
) -> Result<Arc<PlatformPropertyManager>, Error> {
self.action_scheduler
.get_platform_property_manager(instance_name)
.await
}

async fn add_action(&self, action_info: ActionInfo) -> Result<watch::Receiver<Arc<ActionState>>, Error> {
async fn add_action(
&self,
action_info: ActionInfo,
) -> Result<watch::Receiver<Arc<ActionState>>, Error> {
if action_info.skip_cache_lookup {
// Cache lookup skipped, forward to the upstream.
return self.action_scheduler.add_action(action_info).await;
Expand All @@ -160,7 +177,9 @@ impl ActionScheduler for CacheLookupScheduler {
let scope_guard = {
let mut cache_check_actions = self.cache_check_actions.lock();
// Check this isn't a duplicate request first.
if let Some(rx) = subscribe_to_existing_action(&cache_check_actions, &action_info.unique_qualifier) {
if let Some(rx) =
subscribe_to_existing_action(&cache_check_actions, &action_info.unique_qualifier)
{
return Ok(rx);
}
cache_check_actions.insert(action_info.unique_qualifier.clone(), tx.clone());
Expand All @@ -185,11 +204,13 @@ impl ActionScheduler for CacheLookupScheduler {
let action_digest = current_state.action_digest();
let instance_name = action_info.instance_name().clone();
if let Some(action_result) =
get_action_from_store(Pin::new(ac_store.as_ref()), *action_digest, instance_name).await
get_action_from_store(Pin::new(ac_store.as_ref()), *action_digest, instance_name)
.await
{
if validate_outputs_exist(&cas_store, &action_result).await {
// Found in the cache, return the result immediately.
Arc::make_mut(&mut current_state).stage = ActionStage::CompletedFromCache(action_result);
Arc::make_mut(&mut current_state).stage =
ActionStage::CompletedFromCache(action_result);
let _ = tx.send(current_state);
return;
}
Expand All @@ -212,10 +233,11 @@ impl ActionScheduler for CacheLookupScheduler {
}
}
Err(err) => {
Arc::make_mut(&mut current_state).stage = ActionStage::Completed(ActionResult {
error: Some(err),
..Default::default()
});
Arc::make_mut(&mut current_state).stage =
ActionStage::Completed(ActionResult {
error: Some(err),
..Default::default()
});
let _ = tx.send(current_state);
}
}
Expand All @@ -234,7 +256,9 @@ impl ActionScheduler for CacheLookupScheduler {
}
}
// Cache skipped may be in the upstream scheduler.
self.action_scheduler.find_existing_action(unique_qualifier).await
self.action_scheduler
.find_existing_action(unique_qualifier)
.await
}

async fn clean_recently_completed_actions(&self) {}
Expand Down
11 changes: 8 additions & 3 deletions nativelink-scheduler/src/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use crate::property_modifier_scheduler::PropertyModifierScheduler;
use crate::simple_scheduler::SimpleScheduler;
use crate::worker_scheduler::WorkerScheduler;

pub type SchedulerFactoryResults = (Option<Arc<dyn ActionScheduler>>, Option<Arc<dyn WorkerScheduler>>);
pub type SchedulerFactoryResults = (
Option<Arc<dyn ActionScheduler>>,
Option<Arc<dyn WorkerScheduler>>,
);

pub fn scheduler_factory(
scheduler_type_cfg: &SchedulerConfig,
Expand Down Expand Up @@ -95,14 +98,16 @@ fn inner_scheduler_factory(
// (ActionScheduler and WorkerScheduler) and we need to be able to know if the underlying scheduler
// has already been visited, not just the trait. `Any` could be used, but that'd require some rework
// of all the schedulers. This is the most simple way to do it. Rust's uintptr_t is usize.
let action_scheduler_uintptr: usize = Arc::as_ptr(action_scheduler).cast::<()>() as usize;
let action_scheduler_uintptr: usize =
Arc::as_ptr(action_scheduler).cast::<()>() as usize;
if !visited_schedulers.contains(&action_scheduler_uintptr) {
visited_schedulers.insert(action_scheduler_uintptr);
action_scheduler.clone().register_metrics(scheduler_metrics);
}
}
if let Some(worker_scheduler) = &scheduler.1 {
let worker_scheduler_uintptr: usize = Arc::as_ptr(worker_scheduler).cast::<()>() as usize;
let worker_scheduler_uintptr: usize =
Arc::as_ptr(worker_scheduler).cast::<()>() as usize;
if !visited_schedulers.contains(&worker_scheduler_uintptr) {
visited_schedulers.insert(worker_scheduler_uintptr);
worker_scheduler.clone().register_metrics(scheduler_metrics);
Expand Down
Loading

0 comments on commit 0d9e7b1

Please sign in to comment.