Skip to content

Commit 01b2473

Browse files
committed
fix(consensus-manager): Add exponential backoff for artifact timeout
1 parent acb530c commit 01b2473

File tree

3 files changed

+34
-19
lines changed

3 files changed

+34
-19
lines changed

rs/p2p/consensus_manager/src/receiver.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use axum::{
1111
routing::any,
1212
Extension, Router,
1313
};
14+
use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
1415
use bytes::Bytes;
1516
use crossbeam_channel::Sender as CrossbeamSender;
1617
use ic_interfaces::p2p::consensus::{PriorityFnAndFilterProducer, ValidatedPoolReader};
@@ -36,7 +37,8 @@ use tokio::{
3637
time::{self, sleep_until, timeout_at, Instant, MissedTickBehavior},
3738
};
3839

39-
const ARTIFACT_RPC_TIMEOUT: Duration = Duration::from_secs(5);
40+
const MIN_ARTIFACT_RPC_TIMEOUT: Duration = Duration::from_secs(5);
41+
const MAX_ARTIFACT_RPC_TIMEOUT: Duration = Duration::from_secs(120);
4042
const PRIORITY_FUNCTION_UPDATE_INTERVAL: Duration = Duration::from_secs(3);
4143

4244
type ValidatedPoolReaderRef<T> = Arc<RwLock<dyn ValidatedPoolReader<T> + Send + Sync>>;
@@ -504,6 +506,12 @@ where
504506
)
505507
.await?;
506508

509+
let mut artifact_download_timeout = ExponentialBackoffBuilder::new()
510+
.with_initial_interval(MIN_ARTIFACT_RPC_TIMEOUT)
511+
.with_max_interval(MAX_ARTIFACT_RPC_TIMEOUT)
512+
.with_max_elapsed_time(None)
513+
.build();
514+
507515
match artifact {
508516
// Artifact was pushed by peer. In this case we don't need check that the artifact ID corresponds
509517
// to the artifact because we earlier derived the ID from the artifact.
@@ -527,7 +535,14 @@ where
527535
.body(bytes)
528536
.unwrap();
529537

530-
let next_request_at = Instant::now() + ARTIFACT_RPC_TIMEOUT;
538+
if peer_rx.has_changed().unwrap_or(false) {
539+
artifact_download_timeout.reset();
540+
}
541+
542+
let next_request_at = Instant::now()
543+
+ artifact_download_timeout
544+
.next_backoff()
545+
.unwrap_or(MAX_ARTIFACT_RPC_TIMEOUT);
531546
match timeout_at(next_request_at, transport.rpc(&peer, request)).await {
532547
Ok(Ok(response)) if response.status() == StatusCode::OK => {
533548
let body = response.into_body();

rs/p2p/memory_transport/src/lib.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ impl TransportRouter {
127127
pub fn add_peer(
128128
&mut self,
129129
node_id: NodeId,
130-
mut router: Router,
130+
router: Router,
131131
latency: Duration,
132132
capacity: usize,
133133
) -> PeerTransport {
@@ -151,15 +151,19 @@ impl TransportRouter {
151151
parts.extensions.insert(ConnId::from(u64::MAX));
152152
let req = Request::from_parts(parts, Body::from(body));
153153

154-
// Call request handler
155-
let resp = router.ready().await.unwrap().call(req).await.unwrap();
154+
let router_resp_tx = router_resp_tx.clone();
155+
let mut router = router.clone();
156+
tokio::spawn(async move {
157+
// Call request handler
158+
let resp = router.ready().await.unwrap().call(req).await.unwrap();
156159

157-
// Transform request back to `Request<Bytes>` and attach this node in the extension map.
158-
let (mut parts, body) = resp.into_parts();
159-
let body = to_bytes(body).await.unwrap();
160-
parts.extensions.insert(this_node_id);
161-
let resp = Response::from_parts(parts, body);
162-
let _ = router_resp_tx.send((resp, origin_id, oneshot_tx));
160+
// Transform request back to `Request<Bytes>` and attach this node in the extension map.
161+
let (mut parts, body) = resp.into_parts();
162+
let body = to_bytes(body).await.unwrap();
163+
parts.extensions.insert(this_node_id);
164+
let resp = Response::from_parts(parts, body);
165+
let _ = router_resp_tx.send((resp, origin_id, oneshot_tx));
166+
});
163167
}
164168
});
165169

rs/p2p/test_utils/src/consensus.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -180,20 +180,16 @@ impl TestConsensus<U64Artifact> {
180180

181181
pub fn push_advert(&self, id: u64) {
182182
let mut inner = self.inner.lock().unwrap();
183-
inner
184-
.peer_pool
185-
.entry(self.node_id)
186-
.and_modify(|p| p.insert(id));
183+
let my_pool = inner.peer_pool.get_mut(&self.node_id).unwrap();
184+
my_pool.insert(id);
187185

188186
inner.adverts.push_back(id);
189187
}
190188

191189
pub fn push_purge(&self, id: u64) {
192190
let mut inner = self.inner.lock().unwrap();
193-
inner
194-
.peer_pool
195-
.entry(self.node_id)
196-
.and_modify(|p| p.remove(id));
191+
let my_pool = inner.peer_pool.get_mut(&self.node_id).unwrap();
192+
my_pool.remove(id);
197193

198194
inner.purge.push_back(id);
199195
}

0 commit comments

Comments
 (0)