Skip to content

Commit

Permalink
Fix clippy erros for most other non-scheduler files
Browse files Browse the repository at this point in the history
  • Loading branch information
allada committed Jul 12, 2023
1 parent 5dc31e2 commit 264849b
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 64 deletions.
47 changes: 20 additions & 27 deletions cas/cas_main.rs
Expand Up @@ -17,7 +17,6 @@ use std::sync::Arc;

use clap::Parser;
use futures::future::{select_all, BoxFuture, OptionFuture, TryFutureExt};
use json5;
use runfiles::Runfiles;
use tonic::codec::CompressionEncoding;
use tonic::transport::Server;
Expand Down Expand Up @@ -155,7 +154,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

for server_cfg in cfg.servers {
let server = Server::builder();
let services = server_cfg.services.ok_or_else(|| "'services' must be configured")?;
let services = server_cfg.services.ok_or("'services' must be configured")?;

let server = server
// TODO(allada) This is only used so we can get 200 status codes to know if our service
Expand All @@ -165,7 +164,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
services
.ac
.map_or(Ok(None), |cfg| {
AcServer::new(&cfg, &store_manager).and_then(|v| {
AcServer::new(&cfg, &store_manager).map(|v| {
let mut service = v.into_service();
let send_algo = &server_cfg.compression.send_compression_algorithm;
if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) {
Expand All @@ -175,13 +174,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.compression
.accepted_compression_algorithms
.iter()
.map(into_encoding)
// Filter None values.
.filter_map(|v| v)
.filter_map(into_encoding)
{
service = service.accept_compressed(encoding);
}
Ok(Some(service))
Some(service)
})
})
.err_tip(|| "Could not create AC service")?,
Expand All @@ -190,7 +188,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
services
.cas
.map_or(Ok(None), |cfg| {
CasServer::new(&cfg, &store_manager).and_then(|v| {
CasServer::new(&cfg, &store_manager).map(|v| {
let mut service = v.into_service();
let send_algo = &server_cfg.compression.send_compression_algorithm;
if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) {
Expand All @@ -200,13 +198,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.compression
.accepted_compression_algorithms
.iter()
.map(into_encoding)
// Filter None values.
.filter_map(|v| v)
.filter_map(into_encoding)
{
service = service.accept_compressed(encoding);
}
Ok(Some(service))
Some(service)
})
})
.err_tip(|| "Could not create CAS service")?,
Expand All @@ -215,7 +212,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
services
.execution
.map_or(Ok(None), |cfg| {
ExecutionServer::new(&cfg, &action_schedulers, &store_manager).and_then(|v| {
ExecutionServer::new(&cfg, &action_schedulers, &store_manager).map(|v| {
let mut service = v.into_service();
let send_algo = &server_cfg.compression.send_compression_algorithm;
if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) {
Expand All @@ -225,13 +222,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.compression
.accepted_compression_algorithms
.iter()
.map(into_encoding)
// Filter None values.
.filter_map(|v| v)
.filter_map(into_encoding)
{
service = service.accept_compressed(encoding);
}
Ok(Some(service))
Some(service)
})
})
.err_tip(|| "Could not create Execution service")?,
Expand All @@ -240,7 +236,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
services
.bytestream
.map_or(Ok(None), |cfg| {
ByteStreamServer::new(&cfg, &store_manager).and_then(|v| {
ByteStreamServer::new(&cfg, &store_manager).map(|v| {
let mut service = v.into_service();
let send_algo = &server_cfg.compression.send_compression_algorithm;
if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) {
Expand All @@ -250,13 +246,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.compression
.accepted_compression_algorithms
.iter()
.map(into_encoding)
// Filter None values.
.filter_map(|v| v)
.filter_map(into_encoding)
{
service = service.accept_compressed(encoding);
}
Ok(Some(service))
Some(service)
})
})
.err_tip(|| "Could not create ByteStream service")?,
Expand All @@ -267,14 +262,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.capabilities
.as_ref()
// Borrow checker fighting here...
.map(|_| CapabilitiesServer::new(&services.capabilities.as_ref().unwrap(), &action_schedulers)),
.map(|_| CapabilitiesServer::new(services.capabilities.as_ref().unwrap(), &action_schedulers)),
)
.await
.map_or(Ok::<Option<CapabilitiesServer>, Error>(None), |server| {
Ok(Some(server?))
})
.err_tip(|| "Could not create Capabilities service")?
.and_then(|v| {
.map(|v| {
let mut service = v.into_service();
let send_algo = &server_cfg.compression.send_compression_algorithm;
if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) {
Expand All @@ -284,20 +279,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.compression
.accepted_compression_algorithms
.iter()
.map(into_encoding)
// Filter None values.
.filter_map(|v| v)
.filter_map(into_encoding)
{
service = service.accept_compressed(encoding);
}
Some(service)
service
}),
)
.add_optional_service(
services
.worker_api
.map_or(Ok(None), |cfg| {
WorkerApiServer::new(&cfg, &worker_schedulers).and_then(|v| {
WorkerApiServer::new(&cfg, &worker_schedulers).map(|v| {
let mut service = v.into_service();
let send_algo = &server_cfg.compression.send_compression_algorithm;
if let Some(encoding) = into_encoding(&send_algo.unwrap_or(CompressionAlgorithm::None)) {
Expand All @@ -307,13 +301,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.compression
.accepted_compression_algorithms
.iter()
.map(into_encoding)
// Filter None values.
.filter_map(|v| v)
.filter_map(into_encoding)
{
service = service.accept_compressed(encoding);
}
Ok(Some(service))
Some(service)
})
})
.err_tip(|| "Could not create WorkerApi service")?,
Expand Down
2 changes: 0 additions & 2 deletions config/cas_server.rs
Expand Up @@ -16,9 +16,7 @@ use std::collections::HashMap;

use serde::Deserialize;

use schedulers;
use serde_utils::{convert_numeric_with_shellexpand, convert_string_with_shellexpand};
use stores;

/// Name of the store. This type will be used when referencing a store
/// in the `CasConfig::stores`'s map key.
Expand Down
4 changes: 2 additions & 2 deletions proto/gen_protos_tool.rs
Expand Up @@ -24,9 +24,9 @@ fn main() -> std::io::Result<()> {
let output_dir = PathBuf::from(matches.get_one::<String>("output_dir").unwrap());

let mut config = Config::new();
config.bytes(&["."]);
config.bytes(["."]);
tonic_build::configure()
.out_dir(&output_dir)
.out_dir(output_dir)
.compile_with_config(config, &paths, &["proto"])?;
Ok(())
}
18 changes: 9 additions & 9 deletions util/async_fixed_buffer.rs
Expand Up @@ -44,9 +44,9 @@ pin_project! {
}

fn wake(waker: &mut Option<Waker>) {
waker.take().map(|w| {
if let Some(w) = waker.take() {
w.wake();
});
}
}

fn park(waker: &mut Option<Waker>, cx: &Context<'_>) {
Expand Down Expand Up @@ -116,7 +116,7 @@ impl<T: AsRef<[u8]> + Unpin> AsyncRead for AsyncFixedBuf<T> {

let num_read = me.inner.read_and_copy_bytes(buf.initialize_unfilled());
buf.advance(num_read);
if num_read <= 0 {
if num_read == 0 {
if me.received_eof.load(Ordering::Relaxed) {
wake(&mut waker);
return Poll::Ready(Ok(()));
Expand All @@ -130,8 +130,8 @@ impl<T: AsRef<[u8]> + Unpin> AsyncRead for AsyncFixedBuf<T> {
}
// Have not yet reached EOF and have no data to read,
// so we need to try and wake our writer, replace the waker and wait.
if num_read <= 0 {
park(&mut waker, &cx);
if num_read == 0 {
park(&mut waker, cx);
return Poll::Pending;
}
wake(&mut waker);
Expand All @@ -151,7 +151,7 @@ impl<T: AsMut<[u8]> + AsRef<[u8]>> AsyncWrite for AsyncFixedBuf<T> {
"Receiver disconnected",
)));
}
if buf.len() == 0 {
if buf.is_empty() {
// EOF happens when a zero byte message is sent.
me.received_eof.store(true, Ordering::Relaxed);
me.did_shutdown.store(true, Ordering::Relaxed);
Expand Down Expand Up @@ -183,7 +183,7 @@ impl<T: AsMut<[u8]> + AsRef<[u8]>> AsyncWrite for AsyncFixedBuf<T> {
return Poll::Pending;
}

park(&mut waker, &cx);
park(&mut waker, cx);
Poll::Pending
}
}
Expand All @@ -196,7 +196,7 @@ impl<T: AsMut<[u8]> + AsRef<[u8]>> AsyncWrite for AsyncFixedBuf<T> {
wake(&mut waker);
return Poll::Ready(Ok(()));
}
park(&mut waker, &cx);
park(&mut waker, cx);
Poll::Pending
}

Expand Down Expand Up @@ -250,7 +250,7 @@ pin_project! {
fn drop(mut this: Pin<&mut Self>) {
if !this.did_shutdown.load(Ordering::Relaxed) {
let close_fut = this.close_fut.take().unwrap();
tokio::spawn(async move { close_fut.await });
tokio::spawn(close_fut);
}
}
}
Expand Down
26 changes: 13 additions & 13 deletions util/buf_channel.rs
Expand Up @@ -41,7 +41,7 @@ pub fn make_buf_channel_pair() -> (DropCloserWriteHalf, DropCloserReadHalf) {
close_rx,
},
DropCloserReadHalf {
rx: rx,
rx,
partial: None,
close_tx: Some(close_tx),
close_after_size: u64::MAX,
Expand Down Expand Up @@ -101,7 +101,7 @@ impl DropCloserWriteHalf {
match reader.next().await {
Some(maybe_chunk) => {
let chunk = maybe_chunk.err_tip(|| "Failed to forward message")?;
if chunk.len() == 0 {
if chunk.is_empty() {
// Don't send EOF here. We instead rely on None result to be EOF.
continue;
}
Expand Down Expand Up @@ -210,7 +210,7 @@ impl DropCloserReadHalf {
}
}

pub async fn peek<'a>(&'a mut self) -> &'a Result<Bytes, Error> {
pub async fn peek(&mut self) -> &Result<Bytes, Error> {
assert!(
self.close_after_size == u64::MAX,
"Can't call peek() when take() was called"
Expand All @@ -219,7 +219,7 @@ impl DropCloserReadHalf {
self.partial = Some(self.recv().await);
}
if let Some(result) = &self.partial {
return &result;
return result;
}
unreachable!();
}
Expand All @@ -240,7 +240,7 @@ impl DropCloserReadHalf {
.await
.err_tip(|| "Failed to recv first chunk in collect_all_with_size_hint")?;

if first_chunk.len() == 0 {
if first_chunk.is_empty() {
return Ok(first_chunk);
}

Expand All @@ -249,7 +249,7 @@ impl DropCloserReadHalf {
.await
.err_tip(|| "Failed to recv second chunk in collect_all_with_size_hint")?;

if second_chunk.len() == 0 {
if second_chunk.is_empty() {
return Ok(first_chunk);
}
(first_chunk, second_chunk)
Expand All @@ -264,7 +264,7 @@ impl DropCloserReadHalf {
.recv()
.await
.err_tip(|| "Failed to recv in collect_all_with_size_hint")?;
if chunk.len() == 0 {
if chunk.is_empty() {
break; // EOF.
}
buf.put(chunk);
Expand All @@ -288,7 +288,7 @@ impl DropCloserReadHalf {
}
assert!(partial.is_none(), "Partial should have been consumed during the recv()");
let local_partial = chunk.split_off(desired_size - current_size);
*partial = if local_partial.len() == 0 {
*partial = if local_partial.is_empty() {
None
} else {
Some(Ok(local_partial))
Expand All @@ -303,16 +303,16 @@ impl DropCloserReadHalf {
// we will then go the slow path and actually copy our data.
let mut first_chunk = self.recv().await.err_tip(|| "During first buf_channel::take()")?;
populate_partial_if_needed(0, size, &mut first_chunk, &mut self.partial);
if first_chunk.len() == 0 || first_chunk.len() >= size {
if first_chunk.is_empty() || first_chunk.len() >= size {
assert!(
first_chunk.len() == 0 || first_chunk.len() == size,
first_chunk.is_empty() || first_chunk.len() == size,
"Length should be exactly size here"
);
return Ok(first_chunk);
}

let mut second_chunk = self.recv().await.err_tip(|| "During second buf_channel::take()")?;
if second_chunk.len() == 0 {
if second_chunk.is_empty() {
assert!(
first_chunk.len() <= size,
"Length should never be larger than size here"
Expand All @@ -328,7 +328,7 @@ impl DropCloserReadHalf {

loop {
let mut chunk = self.recv().await.err_tip(|| "During buf_channel::take()")?;
if chunk.len() == 0 {
if chunk.is_empty() {
break; // EOF.
}

Expand All @@ -353,7 +353,7 @@ impl Stream for DropCloserReadHalf {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Box::pin(self.recv()).as_mut().poll(cx).map(|result| match result {
Ok(bytes) => {
if bytes.len() == 0 {
if bytes.is_empty() {
return None;
}
Some(Ok(bytes))
Expand Down
3 changes: 1 addition & 2 deletions util/fs.rs
Expand Up @@ -20,7 +20,6 @@ use std::task::Context;
use std::task::Poll;

use error::{make_err, Code, Error, ResultExt};
use log;
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom};
use tokio::sync::{Semaphore, SemaphorePermit};

Expand Down Expand Up @@ -214,7 +213,7 @@ pub async fn read_dir(path: impl AsRef<Path>) -> Result<ReadDir<'static>, Error>
.map_err(|e| make_err!(Code::Internal, "Open file semaphore closed {:?}", e))?;
Ok(ReadDir {
permit,
inner: tokio::fs::read_dir(path).await.map_err(|e| Into::<Error>::into(e))?,
inner: tokio::fs::read_dir(path).await.map_err(Into::<Error>::into)?,
})
}

Expand Down

0 comments on commit 264849b

Please sign in to comment.