Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3372,7 +3372,7 @@ mod tests {

let handles = Catalog::with_debug(|catalog| async { inner(catalog) }).await;
for handle in handles {
handle.await.expect("must succeed");
handle.await;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1418,9 +1418,9 @@ impl CatalogState {
}

// Wait for a view to be ready.
let (handle, _idx, remaining) = future::select_all(handles).await;
let (selected, _idx, remaining) = future::select_all(handles).await;
handles = remaining;
let (id, global_id, res) = handle.expect("must join");
let (id, global_id, res) = selected;
let mut insert_cached_expr = |cached_expr| {
if let Some(cached_expr) = cached_expr {
local_expression_cache.insert_cached_expression(global_id, cached_expr);
Expand Down
5 changes: 1 addition & 4 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2553,15 +2553,12 @@ impl Coordinator {

let retractions_res = futures::future::join_all(retraction_tasks).await;
for retractions in retractions_res {
let retractions = retractions.expect("cannot fail to fetch snapshot");
builtin_table_updates.push(retractions);
}

let audit_join_start = Instant::now();
info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
let audit_log_updates = audit_log_task
.await
.expect("cannot fail to fetch audit log updates");
let audit_log_updates = audit_log_task.await;
let audit_log_builtin_table_updates = self
.catalog()
.state()
Expand Down
11 changes: 1 addition & 10 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,16 +252,7 @@ impl Coordinator {
spawn(|| "sequence_staged", async move {
tokio::select! {
res = handle => {
let next = match res {
Ok(next) => return_if_err!(next, ctx),
Err(err) => {
tracing::error!("sequence_staged join error {err}");
ctx.retire(Err(AdapterError::Internal(
"sequence_staged join error".into(),
)));
return;
}
};
let next = return_if_err!(res, ctx);
f(ctx, next);
}
_ = rx, if cancel_enabled => {
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/explain/insights.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl PlanInsights {
});
for task in tasks {
let res = task.await;
let Ok(Ok((name, plan))) = res else {
let Ok((name, plan)) = res else {
continue;
};
let (plan, _, _) = plan.unapply();
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl AppendWebhookValidator {
.map_err(|e| {
tracing::error!("Failed to run validation for webhook, {e}");
AppendWebhookError::ValidationError
})??;
})?;

valid
}
Expand Down
4 changes: 2 additions & 2 deletions src/aws-util/src/s3_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use bytes::{Bytes, BytesMut};
use bytesize::ByteSize;
use mz_ore::cast::CastFrom;
use mz_ore::error::ErrorExt;
use mz_ore::task::{JoinHandle, JoinHandleExt, spawn};
use mz_ore::task::{JoinHandle, spawn};

/// A multi part uploader which can upload a single object across multiple parts
/// and keeps track of state to eventually finish the upload process.
Expand Down Expand Up @@ -226,7 +226,7 @@ impl S3MultiPartUploader {

let mut parts: Vec<CompletedPart> = Vec::with_capacity(self.upload_handles.len());
for handle in self.upload_handles {
let (etag, part_num) = handle.wait_and_assert_finished().await?;
let (etag, part_num) = handle.await?;
match etag {
Some(etag) => {
parts.push(
Expand Down
2 changes: 1 addition & 1 deletion src/balancerd/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ async fn test_balancer() {
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
handle.await;
}
let end_auth_count = *frontegg_server.auth_requests.lock().unwrap();
// We expect that the auth count increased by fewer than the number of connections.
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/tests/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ async fn test_concurrent_debugs(state_builder: TestCatalogStateBuilder) {
.await
.unwrap();

state_handle.await.unwrap();
state_handle.await;

let mut state = state_builder
.clone()
Expand Down Expand Up @@ -625,7 +625,7 @@ async fn test_concurrent_debugs(state_builder: TestCatalogStateBuilder) {
.await
.unwrap();

state_handle.await.unwrap();
state_handle.await;

let configs = state_builder
.clone()
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/tests/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ async fn test_concurrent_open(state_builder: TestCatalogStateBuilder) {
.unwrap()
.0;

state_handle.await.unwrap();
state_handle.await;

// Open again to ensure that we didn't commit an invalid retraction.
let _state = state_builder
Expand Down
11 changes: 3 additions & 8 deletions src/environmentd/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3343,14 +3343,10 @@ async fn webhook_concurrent_actions() {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
// Stop the threads.
keep_sending.store(false, std::sync::atomic::Ordering::Relaxed);
let results = poster.await.expect("thread panicked!");
let results = poster.await;

// Inspect the results.
let mut results = results
.into_iter()
.collect::<Result<Vec<_>, _>>()
.expect("no join failures")
.into_iter();
let mut results = results.into_iter().collect::<Vec<_>>().into_iter();

for _ in 0..num_requests_before_drop {
let response = results.next().expect("element");
Expand Down Expand Up @@ -3463,8 +3459,7 @@ fn webhook_concurrency_limit() {
}
let results = server
.runtime()
.block_on(futures::future::try_join_all(handles))
.expect("failed to wait for requests");
.block_on(futures::future::join_all(handles));

let successes = results
.iter()
Expand Down
14 changes: 7 additions & 7 deletions src/environmentd/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use mz_ore::collections::CollectionExt;
use mz_ore::now::{EpochMillis, NOW_ZERO, NowFn};
use mz_ore::result::ResultExt;
use mz_ore::retry::Retry;
use mz_ore::task::{self, AbortOnDropHandle, JoinHandleExt};
use mz_ore::task::{self, AbortOnDropHandle};
use mz_ore::{assert_contains, assert_err, assert_ok};
use mz_pgrepr::UInt4;
use mz_repr::Timestamp;
Expand Down Expand Up @@ -194,7 +194,7 @@ async fn test_no_block() {
.expect("server unexpectedly closed channel");

println!("test_no_block: joining task");
slow_task.await.unwrap();
slow_task.await;
};

tokio::time::timeout(Duration::from_secs(120), test_case)
Expand Down Expand Up @@ -297,7 +297,7 @@ async fn test_drop_connection_race() {
.expect("server unexpectedly closed channel");

info!("test_drop_connection_race: asserting response");
let source_res = source_task.await.unwrap();
let source_res = source_task.into_tokio_handle().await.unwrap();
assert_contains!(
source_res.unwrap_err().to_string(),
"unknown catalog item 'conn'"
Expand Down Expand Up @@ -3746,7 +3746,7 @@ async fn test_cancel_linearize_read_then_writes() {
)
.await;
res.unwrap();
handle.await.unwrap();
handle.await;
}

// Test that builtin objects are created in the schemas they advertise in builtin.rs.
Expand Down Expand Up @@ -3817,7 +3817,7 @@ async fn test_serialized_ddl_serial() {
let mut successes = 0;
let mut errors = 0;
for handle in handles {
let result = handle.await.unwrap();
let result = handle.await;
match result {
Ok(_) => {
successes += 1;
Expand Down Expand Up @@ -3869,11 +3869,11 @@ async fn test_serialized_ddl_cancel() {
// Cancel the pending statement (this uses different cancellation logic and is the actual thing
// we are trying to test here).
cancel2.cancel_query(tokio_postgres::NoTls).await.unwrap();
let err = handle2.await.unwrap();
let err = handle2.await;
assert_contains!(err.to_string(), "canceling statement due to user request");
// Cancel the in-progress statement.
cancel1.cancel_query(tokio_postgres::NoTls).await.unwrap();
let err = handle1.await.unwrap();
let err = handle1.await;
assert_contains!(err.to_string(), "canceling statement due to user request");

// The mz_sleep calls above cause this test to not exit until the optimization tasks have fully
Expand Down
2 changes: 1 addition & 1 deletion src/frontegg-mock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
println!("frontegg-mock listening...");
println!(" HTTP address: {}", server.base_url);

server.handle.await??;
server.handle.await?;
anyhow::bail!("serving tasks unexpectedly exited");
}
6 changes: 2 additions & 4 deletions src/mysql-util/src/tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::time::Duration;

use mz_ore::future::{InTask, TimeoutError};
use mz_ore::option::OptionExt;
use mz_ore::task::{JoinHandleExt, spawn};
use mz_ore::task::spawn;
use mz_repr::CatalogItemId;
use mz_ssh_util::tunnel::{SshTimeoutConfig, SshTunnelConfig};
use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
Expand Down Expand Up @@ -371,9 +371,7 @@ impl Config {
opts_builder: OptsBuilder,
) -> Result<mysql_async::Conn, MySqlError> {
let connection_future = if let InTask::Yes = self.in_task {
spawn(|| "mysql_connect".to_string(), Conn::new(opts_builder))
.abort_on_drop()
.wait_and_assert_finished()
Box::pin(spawn(|| "mysql_connect".to_string(), Conn::new(opts_builder)).abort_on_drop())
} else {
Conn::new(opts_builder)
};
Expand Down
4 changes: 2 additions & 2 deletions src/ore/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use pin_project::pin_project;
use tokio::task::futures::TaskLocalFuture;
use tokio::time::{self, Duration, Instant};

use crate::task::{self, JoinHandleExt};
use crate::task;

/// Whether or not to run the future in `run_in_task_if` in a task.
#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -126,7 +126,7 @@ where
T: Send + 'static,
T::Output: Send + 'static,
{
task::spawn(nc, self).wait_and_assert_finished().await
task::spawn(nc, self).await
}

async fn run_in_task_if<Name, NameClosure>(self, in_task: InTask, nc: NameClosure) -> T::Output
Expand Down
Loading