diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 26e90758c6afe..24214784c9e06 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -3372,7 +3372,7 @@ mod tests { let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await; for handle in handles { - handle.await.expect("must succeed"); + handle.await; } } diff --git a/src/adapter/src/catalog/apply.rs b/src/adapter/src/catalog/apply.rs index 57ba0352e0d9b..17e4e2fcae09f 100644 --- a/src/adapter/src/catalog/apply.rs +++ b/src/adapter/src/catalog/apply.rs @@ -1418,9 +1418,9 @@ impl CatalogState { } // Wait for a view to be ready. - let (handle, _idx, remaining) = future::select_all(handles).await; + let (selected, _idx, remaining) = future::select_all(handles).await; handles = remaining; - let (id, global_id, res) = handle.expect("must join"); + let (id, global_id, res) = selected; let mut insert_cached_expr = |cached_expr| { if let Some(cached_expr) = cached_expr { local_expression_cache.insert_cached_expression(global_id, cached_expr); diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 200bc1f061d60..1ea9d2cefe314 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -2553,15 +2553,12 @@ impl Coordinator { let retractions_res = futures::future::join_all(retraction_tasks).await; for retractions in retractions_res { - let retractions = retractions.expect("cannot fail to fetch snapshot"); builtin_table_updates.push(retractions); } let audit_join_start = Instant::now(); info!("startup: coordinator init: bootstrap: join audit log deserialization beginning"); - let audit_log_updates = audit_log_task - .await - .expect("cannot fail to fetch audit log updates"); + let audit_log_updates = audit_log_task.await; let audit_log_builtin_table_updates = self .catalog() .state() diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index ff844043eb442..1b437368a9b1e 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -252,16 +252,7 @@ impl Coordinator { spawn(|| "sequence_staged", async move { tokio::select! { res = handle => { - let next = match res { - Ok(next) => return_if_err!(next, ctx), - Err(err) => { - tracing::error!("sequence_staged join error {err}"); - ctx.retire(Err(AdapterError::Internal( - "sequence_staged join error".into(), - ))); - return; - } - }; + let next = return_if_err!(res, ctx); f(ctx, next); } _ = rx, if cancel_enabled => { diff --git a/src/adapter/src/explain/insights.rs b/src/adapter/src/explain/insights.rs index 0763ded235c8c..9e67b157eddc6 100644 --- a/src/adapter/src/explain/insights.rs +++ b/src/adapter/src/explain/insights.rs @@ -132,7 +132,7 @@ impl PlanInsights { }); for task in tasks { let res = task.await; - let Ok(Ok((name, plan))) = res else { + let Ok((name, plan)) = res else { continue; }; let (plan, _, _) = plan.unapply(); diff --git a/src/adapter/src/webhook.rs b/src/adapter/src/webhook.rs index 843a9e1c0958e..49b0adfa353c4 100644 --- a/src/adapter/src/webhook.rs +++ b/src/adapter/src/webhook.rs @@ -223,7 +223,7 @@ impl AppendWebhookValidator { .map_err(|e| { tracing::error!("Failed to run validation for webhook, {e}"); AppendWebhookError::ValidationError - })??; + })?; valid } diff --git a/src/aws-util/src/s3_uploader.rs b/src/aws-util/src/s3_uploader.rs index 1868c86fedf92..89a8f24a6a61c 100644 --- a/src/aws-util/src/s3_uploader.rs +++ b/src/aws-util/src/s3_uploader.rs @@ -20,7 +20,7 @@ use bytes::{Bytes, BytesMut}; use bytesize::ByteSize; use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; -use mz_ore::task::{JoinHandle, JoinHandleExt, spawn}; +use mz_ore::task::{JoinHandle, spawn}; /// A multi part uploader which can upload a single object across multiple parts /// and keeps track of state to eventually finish the upload process. @@ -226,7 +226,7 @@ impl S3MultiPartUploader { let mut parts: Vec = Vec::with_capacity(self.upload_handles.len()); for handle in self.upload_handles { - let (etag, part_num) = handle.wait_and_assert_finished().await?; + let (etag, part_num) = handle.await?; match etag { Some(etag) => { parts.push( diff --git a/src/balancerd/tests/server.rs b/src/balancerd/tests/server.rs index 90acb6aeef064..d7ea9b8e13423 100644 --- a/src/balancerd/tests/server.rs +++ b/src/balancerd/tests/server.rs @@ -334,7 +334,7 @@ async fn test_balancer() { handles.push(handle); } for handle in handles { - handle.await.unwrap(); + handle.await; } let end_auth_count = *frontegg_server.auth_requests.lock().unwrap(); // We expect that the auth count increased by fewer than the number of connections. diff --git a/src/catalog/tests/debug.rs b/src/catalog/tests/debug.rs index 742116ec654cd..bca01b7a06aa4 100644 --- a/src/catalog/tests/debug.rs +++ b/src/catalog/tests/debug.rs @@ -589,7 +589,7 @@ async fn test_concurrent_debugs(state_builder: TestCatalogStateBuilder) { .await .unwrap(); - state_handle.await.unwrap(); + state_handle.await; let mut state = state_builder .clone() @@ -625,7 +625,7 @@ async fn test_concurrent_debugs(state_builder: TestCatalogStateBuilder) { .await .unwrap(); - state_handle.await.unwrap(); + state_handle.await; let configs = state_builder .clone() diff --git a/src/catalog/tests/open.rs b/src/catalog/tests/open.rs index 28ed7c9214524..a68b967a99f5e 100644 --- a/src/catalog/tests/open.rs +++ b/src/catalog/tests/open.rs @@ -970,7 +970,7 @@ async fn test_concurrent_open(state_builder: TestCatalogStateBuilder) { .unwrap() .0; - state_handle.await.unwrap(); + state_handle.await; // Open again to ensure that we didn't commit an invalid retraction. let _state = state_builder diff --git a/src/environmentd/tests/server.rs b/src/environmentd/tests/server.rs index 7a5efafc41c1e..e2df8e8df0a7b 100644 --- a/src/environmentd/tests/server.rs +++ b/src/environmentd/tests/server.rs @@ -3343,14 +3343,10 @@ async fn webhook_concurrent_actions() { tokio::time::sleep(std::time::Duration::from_secs(2)).await; // Stop the threads. keep_sending.store(false, std::sync::atomic::Ordering::Relaxed); - let results = poster.await.expect("thread panicked!"); + let results = poster.await; // Inspect the results. - let mut results = results - .into_iter() - .collect::, _>>() - .expect("no join failures") - .into_iter(); + let mut results = results.into_iter().collect::>().into_iter(); for _ in 0..num_requests_before_drop { let response = results.next().expect("element"); @@ -3463,8 +3459,7 @@ fn webhook_concurrency_limit() { } let results = server .runtime() - .block_on(futures::future::try_join_all(handles)) - .expect("failed to wait for requests"); + .block_on(futures::future::join_all(handles)); let successes = results .iter() diff --git a/src/environmentd/tests/sql.rs b/src/environmentd/tests/sql.rs index b59b0b7b5b388..fc643a5b0637c 100644 --- a/src/environmentd/tests/sql.rs +++ b/src/environmentd/tests/sql.rs @@ -34,7 +34,7 @@ use mz_ore::collections::CollectionExt; use mz_ore::now::{EpochMillis, NOW_ZERO, NowFn}; use mz_ore::result::ResultExt; use mz_ore::retry::Retry; -use mz_ore::task::{self, AbortOnDropHandle, JoinHandleExt}; +use mz_ore::task::{self, AbortOnDropHandle}; use mz_ore::{assert_contains, assert_err, assert_ok}; use mz_pgrepr::UInt4; use mz_repr::Timestamp; @@ -194,7 +194,7 @@ async fn test_no_block() { .expect("server unexpectedly closed channel"); println!("test_no_block: joining task"); - slow_task.await.unwrap(); + slow_task.await; }; tokio::time::timeout(Duration::from_secs(120), test_case) @@ -297,7 +297,7 @@ async fn test_drop_connection_race() { .expect("server unexpectedly closed channel"); info!("test_drop_connection_race: asserting response"); - let source_res = source_task.await.unwrap(); + let source_res = source_task.into_tokio_handle().await.unwrap(); assert_contains!( source_res.unwrap_err().to_string(), "unknown catalog item 'conn'" @@ -3746,7 +3746,7 @@ async fn test_cancel_linearize_read_then_writes() { ) .await; res.unwrap(); - handle.await.unwrap(); + handle.await; } // Test that builtin objects are created in the schemas they advertise in builtin.rs. @@ -3817,7 +3817,7 @@ async fn test_serialized_ddl_serial() { let mut successes = 0; let mut errors = 0; for handle in handles { - let result = handle.await.unwrap(); + let result = handle.await; match result { Ok(_) => { successes += 1; @@ -3869,11 +3869,11 @@ async fn test_serialized_ddl_cancel() { // Cancel the pending statement (this uses different cancellation logic and is the actual thing // we are trying to test here). cancel2.cancel_query(tokio_postgres::NoTls).await.unwrap(); - let err = handle2.await.unwrap(); + let err = handle2.await; assert_contains!(err.to_string(), "canceling statement due to user request"); // Cancel the in-progress statement. cancel1.cancel_query(tokio_postgres::NoTls).await.unwrap(); - let err = handle1.await.unwrap(); + let err = handle1.await; assert_contains!(err.to_string(), "canceling statement due to user request"); // The mz_sleep calls above cause this test to not exit until the optimization tasks have fully diff --git a/src/frontegg-mock/src/main.rs b/src/frontegg-mock/src/main.rs index d0bf92158841a..662698b846318 100644 --- a/src/frontegg-mock/src/main.rs +++ b/src/frontegg-mock/src/main.rs @@ -137,6 +137,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { println!("frontegg-mock listening..."); println!(" HTTP address: {}", server.base_url); - server.handle.await??; + server.handle.await?; anyhow::bail!("serving tasks unexpectedly exited"); } diff --git a/src/mysql-util/src/tunnel.rs b/src/mysql-util/src/tunnel.rs index c5f4648700176..ee7e22dda173e 100644 --- a/src/mysql-util/src/tunnel.rs +++ b/src/mysql-util/src/tunnel.rs @@ -16,7 +16,7 @@ use std::time::Duration; use mz_ore::future::{InTask, TimeoutError}; use mz_ore::option::OptionExt; -use mz_ore::task::{JoinHandleExt, spawn}; +use mz_ore::task::spawn; use mz_repr::CatalogItemId; use mz_ssh_util::tunnel::{SshTimeoutConfig, SshTunnelConfig}; use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager}; @@ -371,9 +371,7 @@ impl Config { opts_builder: OptsBuilder, ) -> Result { let connection_future = if let InTask::Yes = self.in_task { - spawn(|| "mysql_connect".to_string(), Conn::new(opts_builder)) - .abort_on_drop() - .wait_and_assert_finished() + Box::pin(spawn(|| "mysql_connect".to_string(), Conn::new(opts_builder)).abort_on_drop()) } else { Conn::new(opts_builder) }; diff --git a/src/ore/src/future.rs b/src/ore/src/future.rs index ffc10a16e6778..f479337786d7a 100644 --- a/src/ore/src/future.rs +++ b/src/ore/src/future.rs @@ -35,7 +35,7 @@ use pin_project::pin_project; use tokio::task::futures::TaskLocalFuture; use tokio::time::{self, Duration, Instant}; -use crate::task::{self, JoinHandleExt}; +use crate::task; /// Whether or not to run the future in `run_in_task_if` in a task. #[derive(Clone, Copy, Debug)] @@ -126,7 +126,7 @@ where T: Send + 'static, T::Output: Send + 'static, { - task::spawn(nc, self).wait_and_assert_finished().await + task::spawn(nc, self).await } async fn run_in_task_if(self, in_task: InTask, nc: NameClosure) -> T::Output diff --git a/src/ore/src/task.rs b/src/ore/src/task.rs index 590e7880bd6d7..7c4ae84baf7f1 100644 --- a/src/ore/src/task.rs +++ b/src/ore/src/task.rs @@ -47,148 +47,118 @@ use std::task::{Context, Poll}; use futures::FutureExt; use tokio::runtime::{Handle, Runtime}; -use tokio::task::{self, JoinError, JoinHandle as TokioJoinHandle}; +use tokio::task::{self, JoinHandle as TokioJoinHandle}; /// Wraps a [`JoinHandle`] to abort the underlying task when dropped. #[derive(Debug)] -pub struct AbortOnDropHandle(TokioJoinHandle); +pub struct AbortOnDropHandle(JoinHandle); impl AbortOnDropHandle { /// Checks if the task associated with this [`AbortOnDropHandle`] has finished.a pub fn is_finished(&self) -> bool { - self.0.is_finished() + self.0.inner.is_finished() } - // Note: adding an `abort(&self)` method here is incorrect, please see `unpack_join_result`. + // Note: adding an `abort(&self)` method here is incorrect; see the comment in JoinHandle::poll. } impl Drop for AbortOnDropHandle { fn drop(&mut self) { - self.0.abort(); + self.0.inner.abort(); } } impl Future for AbortOnDropHandle { - type Output = Result; + type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.0.poll_unpin(cx) } } -/// Wraps a tokio `JoinHandle` and provides 4 exclusive (i.e. they take `self` ownership) +/// Wraps a tokio `JoinHandle` that has never been cancelled. +/// This allows it to have an infallible implementation of [Future], +/// and provides some exclusive (i.e. they take `self` ownership) /// operations: /// /// - `abort_on_drop`: create an `AbortOnDropHandle` that will automatically abort the task /// when the handle is dropped. -/// - `JoinHandleExt::wait_and_assert_finished`: wait for the task to finish and return its return value. /// - `JoinHandleExt::abort_and_wait`: abort the task and wait for it to be finished. /// - `into_tokio_handle`: turn it into an ordinary tokio `JoinHandle`. #[derive(Debug)] -pub struct JoinHandle(TokioJoinHandle); +pub struct JoinHandle { + inner: TokioJoinHandle, + runtime_shutting_down: bool, +} + +impl JoinHandle { + /// Wrap a tokio join handle. This is intentionally private, so we can statically guarantee + /// that the inner join handle has not been aborted. + fn new(handle: TokioJoinHandle) -> Self { + Self { + inner: handle, + runtime_shutting_down: false, + } + } +} impl Future for JoinHandle { - type Output = Result; + type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.0.poll_unpin(cx) + if self.runtime_shutting_down { + return Poll::Pending; + } + match self.inner.poll_unpin(cx) { + Poll::Ready(Ok(res)) => Poll::Ready(res), + Poll::Ready(Err(err)) => { + match err.try_into_panic() { + Ok(panic) => std::panic::resume_unwind(panic), + Err(err) => { + assert!( + err.is_cancelled(), + "join errors are either cancellations or panics" + ); + // Because `JoinHandle` and `AbortOnDropHandle` don't + // offer an `abort` method, this can only happen if the runtime is + // shutting down, which means this `pending` won't cause a deadlock + // because Tokio drops all outstanding futures on shutdown. + // (In multi-threaded runtimes, not all threads drop futures simultaneously, + // so it is possible for a future on one thread to observe the drop of a future + // on another thread, before it itself is dropped.) + self.runtime_shutting_down = true; + Poll::Pending + } + } + } + Poll::Pending => Poll::Pending, + } } } impl JoinHandle { /// Create an [`AbortOnDropHandle`] from this [`JoinHandle`]. pub fn abort_on_drop(self) -> AbortOnDropHandle { - AbortOnDropHandle(self.0) + AbortOnDropHandle(self) } - /// Checks if the task associated with this [`JoinHandle`] has finished.a + /// Checks if the task associated with this [`JoinHandle`] has finished. pub fn is_finished(&self) -> bool { - self.0.is_finished() - } - - /// Checks if the task associated with this [`JoinHandle`] has finished.a - pub fn into_tokio_handle(self) -> TokioJoinHandle { - self.0 + self.inner.is_finished() } - // Note: adding an `abort(&self)` method here is incorrect, please see `unpack_join_result`. -} - -/// Extension methods for [`JoinHandle`] and [`AbortOnDropHandle`]. -#[async_trait::async_trait] -pub trait JoinHandleExt: Future> { - /// Waits for the task to finish, resuming the unwind if the task panicked. - /// - /// Because this takes ownership of `self`, and [`JoinHandle`] and - /// [`AbortOnDropHandle`] don't offer `abort` methods, this can avoid - /// worrying about aborted tasks. - async fn wait_and_assert_finished(self) -> T; - /// Aborts the task, then waits for it to complete. - async fn abort_and_wait(self); -} - -async fn unpack_join_result(res: Result) -> T { - match res { - Ok(val) => val, - Err(err) => match err.try_into_panic() { - Ok(panic) => std::panic::resume_unwind(panic), - Err(_) => { - // Because `JoinHandle` and `AbortOnDropHandle` don't - // offer `abort` method, this can only happen if the runtime is - // shutting down, which means this `pending` won't cause a deadlock - // because Tokio drops all outstanding futures on shutdown. - // (In multi-threaded runtimes, not all threads drop futures simultaneously, - // so it is possible for a future on one thread to observe the drop of a future - // on another thread, before it itself is dropped.) - // - // Instead, we yield to tokio runtime. A single `yield_now` is not - // sufficient as a `select!` or `FuturesUnordered` may - // poll this multiple times during shutdown. - std::future::pending().await - } - }, + pub async fn abort_and_wait(self) { + self.inner.abort(); + let _ = self.inner.await; } -} - -#[async_trait::async_trait] -impl JoinHandleExt for JoinHandle { - async fn wait_and_assert_finished(self) -> T { - unpack_join_result(self.await).await - } - - async fn abort_and_wait(self) { - self.0.abort(); - let _ = self.await; - } -} -#[async_trait::async_trait] -impl + Send> JoinHandleExt - for tracing::instrument::Instrumented -{ - async fn wait_and_assert_finished(self) -> T { - unpack_join_result(self.await).await - } - - async fn abort_and_wait(self) { - self.abort_and_wait().await - } -} - -#[async_trait::async_trait] -impl JoinHandleExt for AbortOnDropHandle { - // Because we are sure the `AbortOnDropHandle` still exists when we call - // `unpack_join_result` is called, we know `abort` hasn't been called, so its - // safe to call. - async fn wait_and_assert_finished(self) -> T { - unpack_join_result(self.await).await + /// Unwrap this handle into a standard [tokio::task::JoinHandle]. + pub fn into_tokio_handle(self) -> TokioJoinHandle { + self.inner } - async fn abort_and_wait(self) { - self.0.abort(); - let _ = self.await; - } + // Note: adding an `abort(&self)` method here is incorrect; see the comment in JoinHandle::poll. } /// Spawns a new asynchronous task with a name. @@ -222,7 +192,7 @@ where Fut::Output: Send + 'static, { #[allow(clippy::disallowed_methods)] - JoinHandle( + JoinHandle::new( task::Builder::new() .name(&format!("{}:{}", Handle::current().id(), nc().as_ref())) .spawn(future) @@ -269,7 +239,7 @@ where Function: FnOnce() -> Output + Send + 'static, Output: Send + 'static, { - JoinHandle( + JoinHandle::new( task::Builder::new() .name(&format!("{}:{}", Handle::current().id(), nc().as_ref())) .spawn_blocking(function) diff --git a/src/persist-cli/src/maelstrom.rs b/src/persist-cli/src/maelstrom.rs index 13d59245cd7cd..79ad41dc0b354 100644 --- a/src/persist-cli/src/maelstrom.rs +++ b/src/persist-cli/src/maelstrom.rs @@ -71,5 +71,4 @@ pub async fn run(args: Args) -> Result<(), anyhow::Error> }, ) .await - .expect("task failed") } diff --git a/src/persist-cli/src/open_loop.rs b/src/persist-cli/src/open_loop.rs index d695e3a47c073..999b045320113 100644 --- a/src/persist-cli/src/open_loop.rs +++ b/src/persist-cli/src/open_loop.rs @@ -252,8 +252,7 @@ where .in_scope(|| data_generator.gen_batch(usize::cast_from(batch_idx))) }, ) - .await - .expect("task failed"); + .await; trace!("data generator {} made a batch", idx); let batch = match batch { Some(x) => x, @@ -432,19 +431,19 @@ where } for handle in generator_handles { - match handle.await? { + match handle.await { Ok(finished) => info!("{}", finished), Err(e) => error!("error: {:?}", e), } } for handle in write_handles { - match handle.await? { + match handle.await { Ok(finished) => info!("{}", finished), Err(e) => error!("error: {:?}", e), } } for handle in read_handles { - match handle.await? { + match handle.await { Ok((finished, _)) => info!("{}", finished), Err(e) => error!("error: {:?}", e), } @@ -661,7 +660,7 @@ mod raw_persist_benchmark { self.tx.take().expect("already finished"); for handle in self.handles.drain(..) { - let () = handle.await?; + let () = handle.await; } Ok(()) diff --git a/src/persist-client/benches/plumbing.rs b/src/persist-client/benches/plumbing.rs index a4040cbaea6b7..87426b6b0f272 100644 --- a/src/persist-client/benches/plumbing.rs +++ b/src/persist-client/benches/plumbing.rs @@ -120,7 +120,7 @@ fn bench_consensus_compare_and_set_all_iters( handles.push(handle); } for handle in handles.into_iter() { - runtime.block_on(handle).expect("task failed"); + runtime.block_on(handle); } start.elapsed() diff --git a/src/persist-client/benches/porcelain.rs b/src/persist-client/benches/porcelain.rs index a83f06ab53803..f5796a9957293 100644 --- a/src/persist-client/benches/porcelain.rs +++ b/src/persist-client/benches/porcelain.rs @@ -192,7 +192,7 @@ async fn bench_write_to_listen_one_iter( // Now wait for the listener task to clean up so it doesn't leak into other // benchmarks. - listen.await.expect("listener task failed"); + listen.await; Ok(batch_count) } diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index d8eabee2ba765..b16df1451b29f 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -274,7 +274,7 @@ where ) .instrument(write_span), ); - let part = handle.await.expect("part write task failed"); + let part = handle.await; parts.push(RunPart::Single(part)); } } @@ -1254,8 +1254,7 @@ impl BatchParts { ) }) .instrument(debug_span!("batch::encode_part")) - .await - .expect("part encode task failed"); + .await; // Can't use the `CodecMetrics::encode` helper because of async. metrics.codecs.batch.encode_count.inc(); metrics diff --git a/src/persist-client/src/cache.rs b/src/persist-client/src/cache.rs index cc9e14b03320d..3c1b3870ed0fd 100644 --- a/src/persist-client/src/cache.rs +++ b/src/persist-client/src/cache.rs @@ -815,6 +815,7 @@ mod tests { ) .await }) + .into_tokio_handle() .await; assert_err!(res); assert_eq!(states.initialized_count(), 0); diff --git a/src/persist-client/src/cli/bench.rs b/src/persist-client/src/cli/bench.rs index 19ecb49d628bc..a38e0e9c6eadc 100644 --- a/src/persist-client/src/cli/bench.rs +++ b/src/persist-client/src/cli/bench.rs @@ -104,8 +104,7 @@ async fn bench_s3(args: &S3FetchArgs) -> Result<(), anyhow::Error> { start.elapsed() }, ) - .await - .unwrap(); + .await; ( key, buf_len, @@ -116,7 +115,7 @@ async fn bench_s3(args: &S3FetchArgs) -> Result<(), anyhow::Error> { fetches.push(fetch); } for fetch in fetches { - let (key, size_bytes, fetch_secs, parse_secs) = fetch.await.unwrap(); + let (key, size_bytes, fetch_secs, parse_secs) = fetch.await; println!( "{},{},{},{},{}", iter, key, size_bytes, fetch_secs, parse_secs diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 625c0f752a936..cf315bfc3c7f5 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -455,16 +455,14 @@ where .compaction .seconds .inc_by(start.elapsed().as_secs_f64()); - let res = res - .map_err(|e| { - metrics.compaction.timed_out.inc(); - anyhow!( - "compaction timed out after {}s: {}", - timeout.as_secs_f64(), - e - ) - })? - .map_err(|e| anyhow!(e))?; + let res = res.map_err(|e| { + metrics.compaction.timed_out.inc(); + anyhow!( + "compaction timed out after {}s: {}", + timeout.as_secs_f64(), + e + ) + })?; match res { Ok(maintenance) => Ok(maintenance), diff --git a/src/persist-client/src/internal/gc.rs b/src/persist-client/src/internal/gc.rs index 9c63e0f1762e3..45ae5a042ac5b 100644 --- a/src/persist-client/src/internal/gc.rs +++ b/src/persist-client/src/internal/gc.rs @@ -168,7 +168,6 @@ where .await }) .await - .expect("gc_and_truncate failed") }; machine.applier.metrics.gc.finished.inc(); machine.applier.shard_metrics.gc_finished.inc(); diff --git a/src/persist-client/src/internal/maintenance.rs b/src/persist-client/src/internal/maintenance.rs index 692cb03ec98c9..e9cf0c69bbe0c 100644 --- a/src/persist-client/src/internal/maintenance.rs +++ b/src/persist-client/src/internal/maintenance.rs @@ -135,7 +135,6 @@ impl RoutineMaintenance { ); machine.add_rollup_for_current_seqno().await }) - .map(Result::unwrap_or_default) .boxed(), ); } diff --git a/src/persist-client/src/internal/merge.rs b/src/persist-client/src/internal/merge.rs index 4cb60301c015c..88f1d072e9004 100644 --- a/src/persist-client/src/internal/merge.rs +++ b/src/persist-client/src/internal/merge.rs @@ -7,7 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use mz_ore::task::{JoinHandle, JoinHandleExt}; +use mz_ore::task::JoinHandle; use std::fmt::{Debug, Formatter}; use std::mem; use std::ops::{Deref, DerefMut}; @@ -178,7 +178,7 @@ impl Pending { pub async fn into_result(self) -> T { match self { - Pending::Writing(h) => h.wait_and_assert_finished().await, + Pending::Writing(h) => h.await, Pending::Blocking => panic!("block_until_ready cancelled?"), Pending::Finished(t) => t, } diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index 0c8613c674371..115af790859fd 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -4406,6 +4406,7 @@ pub(crate) mod tests { .expect_compare_and_append_batch(&mut [], current, current + 1) .await; }) + .into_tokio_handle() .await } diff --git a/src/persist-client/src/internal/watch.rs b/src/persist-client/src/internal/watch.rs index 08e764576255d..a1a628a1855bf 100644 --- a/src/persist-client/src/internal/watch.rs +++ b/src/persist-client/src/internal/watch.rs @@ -143,9 +143,9 @@ mod tests { use futures_task::noop_waker; use mz_build_info::DUMMY_BUILD_INFO; use mz_dyncfg::ConfigUpdates; + use mz_ore::assert_none; use mz_ore::cast::CastFrom; use mz_ore::metrics::MetricsRegistry; - use mz_ore::{assert_none, assert_ok}; use timely::progress::Antichain; use crate::cache::StateCache; @@ -272,10 +272,10 @@ mod tests { }) .collect::>(); for watch in watches { - assert_ok!(watch.await); + watch.await; } for write in writes { - assert_ok!(write.await); + write.await; } } diff --git a/src/persist-client/src/iter.rs b/src/persist-client/src/iter.rs index 48d81e29a4755..050062e4ec68a 100644 --- a/src/persist-client/src/iter.rs +++ b/src/persist-client/src/iter.rs @@ -26,7 +26,7 @@ use futures_util::StreamExt; use futures_util::stream::FuturesUnordered; use itertools::Itertools; use mz_ore::soft_assert_eq_or_log; -use mz_ore::task::{JoinHandle, JoinHandleExt}; +use mz_ore::task::JoinHandle; use mz_persist::indexed::encoding::BlobTraceUpdates; use mz_persist::location::Blob; use mz_persist::metrics::ColumnarMetrics; @@ -597,7 +597,7 @@ where let wrong_sort = data.run_meta.order != Some(RunOrder::Structured); let fetch_result: anyhow::Result> = match task.take() { - Some(handle) => handle.wait_and_assert_finished().await, + Some(handle) => handle.await, None => { data.clone() .fetch( diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 6cc0b95b65d3e..9335679e29387 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -1919,7 +1919,7 @@ mod tests { } for handle in handles { - let () = handle.await.expect("task failed"); + let () = handle.await; } let expected = data.records().collect::>(); @@ -2028,9 +2028,7 @@ mod tests { .expect("handle should have unexpired state"); read.expire().await; for read_heartbeat_task in mem::take(&mut read_unexpired_state._heartbeat_tasks) { - let () = read_heartbeat_task - .await - .expect("task should shutdown cleanly"); + let () = read_heartbeat_task.await; } } diff --git a/src/persist-client/src/operators/shard_source.rs b/src/persist-client/src/operators/shard_source.rs index 082623e38165f..45a61b2977cb2 100644 --- a/src/persist-client/src/operators/shard_source.rs +++ b/src/persist-client/src/operators/shard_source.rs @@ -402,7 +402,6 @@ where } }) .await - .expect("reader creation shouldn't panic") .expect("could not open persist shard"); // Wait for the start signal only after we have obtained a read handle. This makes "cannot @@ -638,7 +637,6 @@ where } }) .await - .expect("fetcher creation shouldn't panic") .expect("shard codecs should not change"); while let Some(event) = descs_input.next().await { diff --git a/src/persist-client/src/write.rs b/src/persist-client/src/write.rs index cea885b06f239..98fe1ecc00dee 100644 --- a/src/persist-client/src/write.rs +++ b/src/persist-client/src/write.rs @@ -1400,6 +1400,6 @@ mod tests { tx.send(next_upper).expect("send failed"); } - task.await.expect("await failed"); + task.await; } } diff --git a/src/persist/src/location.rs b/src/persist/src/location.rs index 2d1af0deb89d3..977cfb4c72363 100644 --- a/src/persist/src/location.rs +++ b/src/persist/src/location.rs @@ -449,7 +449,7 @@ impl Consensus for Tasked { || "persist::task::head", async move { backing.head(&key).await }.instrument(Span::current()), ) - .await? + .await } async fn compare_and_set( @@ -465,7 +465,7 @@ impl Consensus for Tasked { async move { backing.compare_and_set(&key, expected, new).await } .instrument(Span::current()), ) - .await? + .await } async fn scan( @@ -480,7 +480,7 @@ impl Consensus for Tasked { || "persist::task::scan", async move { backing.scan(&key, from, limit).await }.instrument(Span::current()), ) - .await? + .await } async fn truncate(&self, key: &str, seqno: SeqNo) -> Result { @@ -490,7 +490,7 @@ impl Consensus for Tasked { || "persist::task::truncate", async move { backing.truncate(&key, seqno).await }.instrument(Span::current()), ) - .await? + .await } } @@ -565,7 +565,7 @@ impl Blob for Tasked { || "persist::task::get", async move { backing.get(&key).await }.instrument(Span::current()), ) - .await? + .await } /// List all of the keys in the map with metadata about the entry. @@ -590,7 +590,7 @@ impl Blob for Tasked { || "persist::task::set", async move { backing.set(&key, value).await }.instrument(Span::current()), ) - .await? + .await } /// Remove a key from the map. @@ -604,7 +604,7 @@ impl Blob for Tasked { || "persist::task::delete", async move { backing.delete(&key).await }.instrument(Span::current()), ) - .await? + .await } async fn restore(&self, key: &str) -> Result<(), ExternalError> { @@ -614,7 +614,7 @@ impl Blob for Tasked { || "persist::task::restore", async move { backing.restore(&key).await }.instrument(Span::current()), ) - .await? + .await } } diff --git a/src/persist/src/s3.rs b/src/persist/src/s3.rs index af32c559eaf4e..9f8c905c53816 100644 --- a/src/persist/src/s3.rs +++ b/src/persist/src/s3.rs @@ -864,14 +864,13 @@ impl S3Blob { let mut parts = Vec::with_capacity(parts_len); for (part_num, part_fut) in part_futs.into_iter() { let (this_part_elapsed, part_res) = part_fut - .await .inspect(|_| { self.metrics .error_counts .with_label_values(&["UploadPart", "AsyncSpawnError"]) .inc() }) - .map_err(|err| anyhow!(err).context("s3 spawn err"))?; + .await; let part_res = part_res .inspect_err(|err| self.update_error_metrics("UploadPart", err)) .context("s3 upload_part err")?; diff --git a/src/service/src/transport.rs b/src/service/src/transport.rs index c0d0f96268c24..934576b1bca89 100644 --- a/src/service/src/transport.rs +++ b/src/service/src/transport.rs @@ -32,7 +32,7 @@ use bincode::Options; use futures::future; use mz_ore::cast::CastInto; use mz_ore::netio::{Listener, SocketAddr, Stream, TimedReader, TimedWriter}; -use mz_ore::task::{AbortOnDropHandle, JoinHandle, JoinHandleExt}; +use mz_ore::task::{AbortOnDropHandle, JoinHandle}; use semver::Version; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -166,7 +166,7 @@ where // Cancel any existing connection before starting to serve the new one. if let Some((task, token)) = connection_task.take() { drop(token); - task.wait_and_assert_finished().await; + task.await; } let handler = handler_fn(); diff --git a/src/sql/src/kafka_util.rs b/src/sql/src/kafka_util.rs index 75236f6304c93..a46746aa8e953 100644 --- a/src/sql/src/kafka_util.rs +++ b/src/sql/src/kafka_util.rs @@ -191,7 +191,6 @@ where } }) .await - .map_err(|e| sql_err!("{}", e))? } // Kafka supports bulk lookup of watermarks, but it is not exposed in rdkafka. @@ -243,7 +242,6 @@ where } }) .await - .map_err(|e| sql_err!("{}", e))? } /// Validates that we can connect to the broker and obtain metadata about the topic. @@ -268,5 +266,4 @@ where } }) .await - .map_err(|e| sql_err!("{}", e))? } diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 799648108b609..0cc487cbf50fa 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -1548,11 +1548,10 @@ impl WebhookValidation { ); match tokio::time::timeout(Self::MAX_REDUCE_TIME, reduce_task).await { - Ok(Ok(reduced_expr)) => { + Ok(reduced_expr) => { *expression = reduced_expr; Ok(()) } - Ok(Err(_)) => Err("joining task"), Err(_) => Err("timeout"), } } diff --git a/src/storage-operators/src/oneshot_source.rs b/src/storage-operators/src/oneshot_source.rs index adfd6e820a058..3c3b2263fbc9e 100644 --- a/src/storage-operators/src/oneshot_source.rs +++ b/src/storage-operators/src/oneshot_source.rs @@ -336,8 +336,7 @@ where info!(%worker_id, object = %object.name(), "splitting object"); format_.split_work(source_.clone(), object, checksum).await }) - .await - .expect("failed to spawn task")?; + .await?; requests.extend(work_requests); } diff --git a/src/storage-operators/src/s3_oneshot_sink.rs b/src/storage-operators/src/s3_oneshot_sink.rs index ae063de7e0a90..71e434bc4c070 100644 --- a/src/storage-operators/src/s3_oneshot_sink.rs +++ b/src/storage-operators/src/s3_oneshot_sink.rs @@ -22,7 +22,6 @@ use futures::StreamExt; use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; use mz_ore::future::InTask; -use mz_ore::task::JoinHandleExt; use mz_repr::{CatalogItemId, Diff, GlobalId, Row, Timestamp}; use mz_storage_types::connections::ConnectionContext; use mz_storage_types::connections::aws::AwsConnection; @@ -272,7 +271,6 @@ where .await?; Ok::<(), anyhow::Error>(()) }) - .wait_and_assert_finished() .await?; } Ok::(row_count) diff --git a/src/storage-operators/src/s3_oneshot_sink/pgcopy.rs b/src/storage-operators/src/s3_oneshot_sink/pgcopy.rs index aca4812f6f001..a4823795d7826 100644 --- a/src/storage-operators/src/s3_oneshot_sink/pgcopy.rs +++ b/src/storage-operators/src/s3_oneshot_sink/pgcopy.rs @@ -14,7 +14,6 @@ use mz_aws_util::s3_uploader::{ }; use mz_ore::assert_none; use mz_ore::cast::CastFrom; -use mz_ore::task::JoinHandleExt; use mz_pgcopy::{CopyFormatParams, encode_copy_format, encode_copy_format_header}; use mz_repr::{GlobalId, RelationDesc, Row}; use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager; @@ -84,7 +83,7 @@ impl CopyToS3Uploader for PgCopyUploader { total_bytes_uploaded, bucket, key, - } = handle.wait_and_assert_finished().await?; + } = handle.await?; info!( "finished upload: bucket {}, key {}, bytes_uploaded {}, parts_uploaded {}", bucket, key, total_bytes_uploaded, part_count @@ -158,7 +157,7 @@ impl PgCopyUploader { .await; (uploader, sdk_config) }); - let (uploader, sdk_config) = handle.wait_and_assert_finished().await; + let (uploader, sdk_config) = handle.await; self.sdk_config = Some(sdk_config); let mut uploader = uploader?; if self.format.requires_header() { diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index 241968b49c606..fd6885c174b1a 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -1077,7 +1077,7 @@ impl KafkaConnection { let consumer = Arc::clone(&consumer); move || consumer.fetch_metadata(None, timeout) }) - .await?; + .await; match result { Ok(_) => Ok(()), // The error returned by `fetch_metadata` does not provide any details which makes for diff --git a/src/storage-types/src/sources/kafka.rs b/src/storage-types/src/sources/kafka.rs index 87cddfa500e04..acdddfcf558b2 100644 --- a/src/storage-types/src/sources/kafka.rs +++ b/src/storage-types/src/sources/kafka.rs @@ -179,7 +179,7 @@ impl KafkaSourceConnection { Ok(current_upper) } }) - .await? + .await } } diff --git a/src/storage/examples/upsert_open_loop.rs b/src/storage/examples/upsert_open_loop.rs index 24dfb363d34b5..feb37c4f73037 100644 --- a/src/storage/examples/upsert_open_loop.rs +++ b/src/storage/examples/upsert_open_loop.rs @@ -449,8 +449,7 @@ async fn run_benchmark( .in_scope(|| data_generator_cloned.gen_batch(usize::cast_from(batch_idx))) }, ) - .await - .expect("task failed"); + .await; trace!("data generator {} made a batch", source_id); let batch = match batch { Some(x) => x, @@ -752,7 +751,7 @@ async fn run_benchmark( } for handle in generator_handles { - match handle.await? { + match handle.await { Ok(finished) => info!("{}", finished), Err(e) => error!("error: {:?}", e), } diff --git a/src/storage/src/sink/kafka.rs b/src/storage/src/sink/kafka.rs index abe3623b21905..ada33bcd02274 100644 --- a/src/storage/src/sink/kafka.rs +++ b/src/storage/src/sink/kafka.rs @@ -433,7 +433,6 @@ impl TransactionalProducer { let producer = self.producer.clone(); task::spawn_blocking(|| &self.task_name, move || f(producer)) .await - .unwrap() .check_ssh_status(self.producer.context()) } @@ -1069,7 +1068,7 @@ async fn determine_sink_progress( } } Ok(None) - }).await.unwrap().check_ssh_status(&ctx); + }).await.check_ssh_status(&ctx); // Express interest to the computation until after we've received its result drop(parent_token); result @@ -1311,7 +1310,6 @@ async fn fetch_partition_count( } }) .await - .expect("spawning blocking task cannot fail") .check_ssh_status(producer.context())?; match meta.topics().iter().find(|t| t.name() == topic_name) { diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index 02700b725b9e9..d6913eb8dec29 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -1018,7 +1018,7 @@ impl KafkaResumeUpperProcessor { || format!("source({}) kafka offset commit", self.config.id), move || consumer.commit(&tpl, CommitMode::Sync), ) - .await??; + .await?; } Ok(()) } diff --git a/src/testdrive/src/action.rs b/src/testdrive/src/action.rs index 95d1048202d81..68f494373ed97 100644 --- a/src/testdrive/src/action.rs +++ b/src/testdrive/src/action.rs @@ -967,10 +967,8 @@ pub async fn create_state( }) .await?; - let pgconn_task = task::spawn(|| "pgconn_task", pgconn).map(|join| { - join.expect("pgconn_task unexpectedly canceled") - .context("running SQL connection") - }); + let pgconn_task = + task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection")); let materialize_state = create_materialize_state(&config, materialize_catalog_config, pgclient).await?; diff --git a/src/testdrive/src/action/kafka/verify_commit.rs b/src/testdrive/src/action/kafka/verify_commit.rs index f1f299495b491..ef7e8114985c5 100644 --- a/src/testdrive/src/action/kafka/verify_commit.rs +++ b/src/testdrive/src/action/kafka/verify_commit.rs @@ -58,8 +58,7 @@ pub async fn run_verify_commit( ) }, ) - .await - .unwrap()?; + .await?; let found_offset = committed_tpl.elements_for_topic(&topic)[0].offset(); if found_offset != expected_offset { diff --git a/src/testdrive/src/action/postgres/verify_slot.rs b/src/testdrive/src/action/postgres/verify_slot.rs index 23fd95a2a16ca..cc1cd0c651007 100644 --- a/src/testdrive/src/action/postgres/verify_slot.rs +++ b/src/testdrive/src/action/postgres/verify_slot.rs @@ -61,10 +61,7 @@ pub async fn run_verify_slot( .await?; drop(client); - conn_handle - .await - .unwrap() - .context("postgres connection error")?; + conn_handle.await.context("postgres connection error")?; Ok(ControlFlow::Continue) } diff --git a/src/txn-wal/src/operator.rs b/src/txn-wal/src/operator.rs index 4f8b3557ecc28..3705f30ebeee9 100644 --- a/src/txn-wal/src/operator.rs +++ b/src/txn-wal/src/operator.rs @@ -22,7 +22,6 @@ use differential_dataflow::lattice::Lattice; use futures::StreamExt; use mz_dyncfg::{Config, ConfigSet}; use mz_ore::cast::CastFrom; -use mz_ore::task::JoinHandleExt; use mz_persist_client::cfg::RetryParameters; use mz_persist_client::operators::shard_source::{ ErrorHandler, FilterResult, SnapshotMode, shard_source, @@ -696,7 +695,7 @@ impl DataSubscribeTask { pub async fn finish(self) -> Vec<(String, u64, i64)> { // Closing the channel signals the task to exit. drop(self.tx); - self.task.wait_and_assert_finished().await + self.task.await } fn task( diff --git a/src/txn-wal/src/txn_write.rs b/src/txn-wal/src/txn_write.rs index 22c48b379dbff..a360c290e76c2 100644 --- a/src/txn-wal/src/txn_write.rs +++ b/src/txn-wal/src/txn_write.rs @@ -666,7 +666,8 @@ mod tests { txn.write(&ShardId::new(), "foo".into(), (), 1).await; txn.commit_at(&mut txns, 1).await } - }); + }) + .into_tokio_handle(); assert_err!(commit.await); let d0 = txns.expect_register(2).await; @@ -681,7 +682,8 @@ mod tests { txn.write(&d0, "foo".into(), (), 1).await; txn.commit_at(&mut txns, 4).await } - }); + }) + .into_tokio_handle(); assert_err!(commit.await); } diff --git a/src/txn-wal/src/txns.rs b/src/txn-wal/src/txns.rs index 84a7f44c813cd..4bedf10b112a3 100644 --- a/src/txn-wal/src/txns.rs +++ b/src/txn-wal/src/txns.rs @@ -1620,7 +1620,7 @@ mod tests { let mut max_ts = 0; let mut reads = Vec::new(); for worker in workers { - let (t, mut r) = worker.await.unwrap(); + let (t, mut r) = worker.await; max_ts = std::cmp::max(max_ts, t); reads.append(&mut r); } @@ -1638,7 +1638,7 @@ mod tests { info!("now waiting for reads {}", max_ts); for (tx, data_id, as_of, subscribe) in reads { let _ = tx.send(max_ts + 1); - let output = subscribe.await.unwrap(); + let output = subscribe.await; log.assert_eq(data_id, as_of, max_ts + 1, output); } }) @@ -1705,7 +1705,8 @@ mod tests { txn.write(&d0, "bar".into(), (), 1).await; // This panics. let _ = txn.commit_at(&mut txns1, 3).await; - }); + }) + .into_tokio_handle(); assert!(res.await.is_err()); // Forgetting the data shard removes it, so we don't leave the schema