Skip to content

Commit

Permalink
Code for monitoring futures (#5786)
Browse files Browse the repository at this point in the history
fix tests

PR comments
  • Loading branch information
mystenmark committed Nov 2, 2022
1 parent 48f030e commit 2a9ad74
Show file tree
Hide file tree
Showing 54 changed files with 345 additions and 155 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ members = [
"crates/sui-json-rpc-types",
"crates/sui-keys",
"crates/sui-macros",
"crates/sui-metrics",
"crates/sui-network",
"crates/sui-node",
"crates/sui-open-rpc",
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ sui-storage = { path = "../sui-storage" }
sui-config = { path = "../sui-config" }
sui-json = { path = "../sui-json" }
sui-json-rpc-types = { path = "../sui-json-rpc-types" }
sui-metrics = { path = "../sui-metrics" }

move-binary-format.workspace = true
move-bytecode-utils.workspace = true
Expand Down
19 changes: 9 additions & 10 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

use arc_swap::ArcSwap;
use std::{collections::HashMap, ops::Deref, sync::Arc, time::Duration};
use sui_metrics::spawn_monitored_task;
use sui_types::{base_types::AuthorityName, error::SuiResult};
use tokio::{
sync::{oneshot, Mutex, MutexGuard},
Expand Down Expand Up @@ -299,9 +300,7 @@ where
let committee = self.state.committee.load().deref().clone();
let target_num_tasks = usize::min(committee.num_members() - 1, degree);

tokio::task::spawn(async move {
gossip_process(&self, target_num_tasks).await;
})
spawn_monitored_task!(gossip_process(&self, target_num_tasks))
}

/// Restart the node sync process only if one currently exists.
Expand Down Expand Up @@ -361,7 +360,7 @@ where
let node_sync_store = self.state.node_sync_store.clone();

info!("spawning node sync task");
let join_handle = tokio::task::spawn(node_sync_process(
let join_handle = spawn_monitored_task!(node_sync_process(
node_sync_handle,
node_sync_store,
epoch,
Expand All @@ -374,9 +373,7 @@ where

/// Spawn pending certificate execution process
pub async fn spawn_execute_process(self: Arc<Self>) -> JoinHandle<()> {
tokio::task::spawn(async move {
execution_process(self).await;
})
spawn_monitored_task!(execution_process(self))
}

pub async fn cancel_node_sync_process_for_tests(&self) {
Expand All @@ -403,8 +400,10 @@ where
metrics: CheckpointMetrics,
) -> JoinHandle<()> {
// Spawn task to take care of checkpointing
tokio::task::spawn(async move {
checkpoint_process(self, &checkpoint_process_control, metrics).await;
})
spawn_monitored_task!(checkpoint_process(
self,
&checkpoint_process_control,
metrics
))
}
}
9 changes: 5 additions & 4 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use prometheus::{
use std::future::Future;
use std::ops::Deref;
use std::{collections::HashSet, sync::Arc, time::Duration};
use sui_metrics::monitored_future;
use sui_types::committee::StakeUnit;
use sui_types::{
base_types::{AuthorityName, ExecutionDigests},
Expand Down Expand Up @@ -205,7 +206,7 @@ async fn follower_process<A, Handler: DigestHandler<A> + Clone>(
peer_names.insert(name);
let local_active_ref_copy = local_active.clone();
let handler_clone = handler.clone();
gossip_tasks.push(async move {
gossip_tasks.push(monitored_future!(async move {
let follower = Follower::new(name, &local_active_ref_copy);
// Add more duration if we make more than 1 to ensure overlap
debug!(peer = ?name, "Starting gossip from peer");
Expand All @@ -215,7 +216,7 @@ async fn follower_process<A, Handler: DigestHandler<A> + Clone>(
handler_clone,
)
.await
});
}));
k += 1;

// If we have already used all the good stake, then stop here and
Expand Down Expand Up @@ -445,10 +446,10 @@ where
metrics.total_tx_received.inc();

let fut = handler.handle_digest(self, digests).await?;
results.push_back(async move {
results.push_back(monitored_future!(async move {
fut.await?;
Ok::<(TxSequenceNumber, ExecutionDigests), SuiError>((seq, digests))
});
}));

self.state.metrics.gossip_queued_count.inc();
},
Expand Down
24 changes: 11 additions & 13 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use move_core_types::value::MoveStructLayout;
use mysten_network::config::Config;
use sui_config::genesis::Genesis;
use sui_config::NetworkConfig;
use sui_metrics::{monitored_future, spawn_monitored_task};
use sui_network::{
default_mysten_network_config, DEFAULT_CONNECT_TIMEOUT_SEC, DEFAULT_REQUEST_TIMEOUT_SEC,
};
Expand Down Expand Up @@ -505,7 +506,7 @@ where
.unwrap()
.clone();
let authority_clients = self.authority_clients.clone();
if let Ok(res) = timeout(total_timeout, tokio::spawn(async move {
if let Ok(res) = timeout(total_timeout, spawn_monitored_task!(async move {
Self::sync_certificate_to_authority_with_timeout(
&committee,
&authority_clients,
Expand Down Expand Up @@ -743,14 +744,14 @@ where
.map(|name| {
let client = &self.authority_clients[name];
let execute = map_each_authority.clone();
async move {
monitored_future!(async move {
(
*name,
execute(*name, client)
.instrument(tracing::trace_span!("quorum_map_auth", authority =? name.concise()))
.await,
)
}
})
})
.collect();

Expand Down Expand Up @@ -817,19 +818,19 @@ where

let start_req = |name: AuthorityName, client: SafeClient<A>| {
let map_each_authority = map_each_authority.clone();
Box::pin(async move {
Box::pin(monitored_future!(async move {
trace!(?name, now = ?tokio::time::Instant::now() - start, "new request");
let map = map_each_authority(name, client);
Event::Request(name, timeout(timeout_each_authority, map).await)
})
}))
};

let schedule_next = || {
let delay = self.timeouts.serial_authority_request_interval;
Box::pin(async move {
Box::pin(monitored_future!(async move {
sleep(delay).await;
Event::StartNext
})
}))
};

// This process is intended to minimize latency in the face of unreliable authorities,
Expand Down Expand Up @@ -1962,12 +1963,9 @@ where
let (sender, receiver) = tokio::sync::mpsc::channel(OBJECT_DOWNLOAD_CHANNEL_BOUND);
for object_ref in object_refs {
let sender = sender.clone();
tokio::spawn(Self::fetch_one_object(
self.authority_clients.clone(),
object_ref,
self.timeouts.authority_request_timeout,
sender,
));
let client = self.authority_clients.clone();
let timeout = self.timeouts.authority_request_timeout;
spawn_monitored_task!(Self::fetch_one_object(client, object_ref, timeout, sender,));
}
// Close unused channel
drop(sender);
Expand Down
9 changes: 4 additions & 5 deletions crates/sui-core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::sync::Arc;
use std::time::Duration;
use sui_config::genesis::Genesis;
use sui_config::ValidatorInfo;
use sui_metrics::spawn_monitored_task;
use sui_network::{api::ValidatorClient, tonic};
use sui_types::base_types::AuthorityName;
use sui_types::committee::CommitteeWithNetAddresses;
Expand Down Expand Up @@ -412,11 +413,9 @@ impl AuthorityAPI for LocalAuthorityClient {
) -> Result<TransactionInfoResponse, SuiError> {
let state = self.state.clone();
let fault_config = self.fault_config;
tokio::spawn(
async move { Self::handle_certificate(state, certificate, fault_config).await },
)
.await
.unwrap()
spawn_monitored_task!(Self::handle_certificate(state, certificate, fault_config))
.await
.unwrap()
}

async fn handle_account_info_request(
Expand Down
Loading

0 comments on commit 2a9ad74

Please sign in to comment.