Skip to content

Commit 1c246a7

Browse files
DaniPopesCopilot
andauthored
feat(providers): pause heartbeat when no transactions are pending (#2800)
* feat(providers): pause heartbeat when no transactions are pending * chore: downgrade log * wip: tokio * fix * log * Update crates/provider/src/blocks.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent dd96dfe commit 1c246a7

File tree

3 files changed

+74
-6
lines changed

3 files changed

+74
-6
lines changed

crates/provider/src/blocks.rs

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,14 @@ use alloy_transport::RpcError;
55
use async_stream::stream;
66
use futures::{Stream, StreamExt};
77
use lru::LruCache;
8-
use std::{marker::PhantomData, num::NonZeroUsize};
8+
use std::{
9+
marker::PhantomData,
10+
num::NonZeroUsize,
11+
sync::{
12+
atomic::{AtomicBool, Ordering},
13+
Arc,
14+
},
15+
};
916

1017
#[cfg(feature = "pubsub")]
1118
use futures::{future::Either, FutureExt};
@@ -19,6 +26,38 @@ const MAX_RETRIES: usize = 3;
1926
/// Default block number for when we don't have a block yet.
2027
const NO_BLOCK_NUMBER: BlockNumber = BlockNumber::MAX;
2128

29+
#[derive(Default)]
30+
pub(crate) struct Paused {
31+
is_paused: AtomicBool,
32+
notify: tokio::sync::Notify,
33+
}
34+
35+
impl Paused {
36+
pub(crate) fn is_paused(&self) -> bool {
37+
self.is_paused.load(Ordering::Acquire)
38+
}
39+
40+
pub(crate) fn set_paused(&self, paused: bool) {
41+
self.is_paused.store(paused, Ordering::Release);
42+
if !paused {
43+
self.notify.notify_waiters();
44+
}
45+
}
46+
47+
/// Waits until the paused state is changed to `false`.
48+
///
49+
/// Returns `true` if the method actually waited for the paused state to become unpaused,
50+
/// or `false` if it was already unpaused when called.
51+
async fn wait(&self) -> bool {
52+
if !self.is_paused() {
53+
return false;
54+
}
55+
self.notify.notified().await;
56+
debug_assert!(!self.is_paused());
57+
true
58+
}
59+
}
60+
2261
/// Streams new blocks from the client.
2362
pub(crate) struct NewBlocks<N: Network = Ethereum> {
2463
client: WeakClient,
@@ -28,6 +67,7 @@ pub(crate) struct NewBlocks<N: Network = Ethereum> {
2867
next_yield: BlockNumber,
2968
/// LRU cache of known blocks. Only used by the polling task.
3069
known_blocks: LruCache<BlockNumber, N::BlockResponse>,
70+
pub(crate) paused: Arc<Paused>,
3171
_phantom: PhantomData<N>,
3272
}
3373

@@ -37,6 +77,7 @@ impl<N: Network> NewBlocks<N> {
3777
client,
3878
next_yield: NO_BLOCK_NUMBER,
3979
known_blocks: LruCache::new(BLOCK_CACHE_SIZE),
80+
paused: Arc::default(),
4081
_phantom: PhantomData,
4182
}
4283
}
@@ -123,13 +164,17 @@ impl<N: Network> NewBlocks<N> {
123164
yield known_block;
124165
}
125166

167+
// If we're paused, wait until we're unpaused.
168+
// Once unpaused, reset `self.next_yield` to ignore the blocks that were included while we were paused.
169+
let unpaused = self.paused.wait().await;
170+
126171
// Get the tip.
127172
let Some(block_number) = numbers_stream.next().await else {
128173
debug!("polling stream ended");
129174
break 'task;
130175
};
131176
trace!(%block_number, "got block number");
132-
if self.next_yield == NO_BLOCK_NUMBER {
177+
if self.next_yield == NO_BLOCK_NUMBER || unpaused {
133178
assert!(block_number < NO_BLOCK_NUMBER, "too many blocks");
134179
// this stream can be initialized after the first tx was sent,
135180
// to avoid the edge case where the tx is mined immediately, we should apply an

crates/provider/src/heart.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Block heartbeat and pending transaction watcher.
22
3-
use crate::{Provider, RootProvider};
3+
use crate::{blocks::Paused, Provider, RootProvider};
44
use alloy_consensus::BlockHeader;
55
use alloy_json_rpc::RpcError;
66
use alloy_network::{BlockResponse, Network};
@@ -14,6 +14,7 @@ use std::{
1414
collections::{BTreeMap, VecDeque},
1515
fmt,
1616
future::Future,
17+
sync::Arc,
1718
time::Duration,
1819
};
1920
use tokio::{
@@ -463,18 +464,22 @@ pub(crate) struct Heartbeat<N, S> {
463464
/// Ordered map of transactions to reap at a certain time.
464465
reap_at: BTreeMap<Instant, B256>,
465466

467+
/// Whether the heartbeat is currently paused.
468+
paused: Arc<Paused>,
469+
466470
_network: std::marker::PhantomData<N>,
467471
}
468472

469473
impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
470474
/// Create a new heartbeat task.
471-
pub(crate) fn new(stream: S) -> Self {
475+
pub(crate) fn new(stream: S, is_paused: Arc<Paused>) -> Self {
472476
Self {
473477
stream: stream.fuse(),
474478
past_blocks: Default::default(),
475479
unconfirmed: Default::default(),
476480
waiting_confs: Default::default(),
477481
reap_at: Default::default(),
482+
paused: is_paused,
478483
_network: Default::default(),
479484
}
480485
}
@@ -531,6 +536,20 @@ impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat
531536
}
532537
}
533538

539+
/// Check if we have any pending transactions.
540+
fn has_pending_transactions(&self) -> bool {
541+
!self.unconfirmed.is_empty() || !self.waiting_confs.is_empty()
542+
}
543+
544+
/// Update the pause state based on whether we have pending transactions.
545+
fn update_pause_state(&mut self) {
546+
let should_pause = !self.has_pending_transactions();
547+
if self.paused.is_paused() != should_pause {
548+
debug!(paused = should_pause, "updating heartbeat pause state");
549+
self.paused.set_paused(should_pause);
550+
}
551+
}
552+
534553
/// Handle a watch instruction by adding it to the watch list, and
535554
/// potentially adding it to our `reap_at` list.
536555
fn handle_watch_ix(&mut self, to_watch: TxWatcher) {
@@ -597,7 +616,8 @@ impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat
597616
// Check that the chain is continuous.
598617
if *last_height + 1 != block_height {
599618
// Move all the transactions that were reset by the reorg to the unconfirmed list.
600-
warn!(%block_height, last_height, "reorg detected");
619+
// This can also happen if we unpaused the heartbeat after some time.
620+
debug!(block_height, last_height, "reorg/unpause detected");
601621
self.move_reorg_to_unconfirmed(block_height);
602622
// Remove past blocks that are now invalid.
603623
self.past_blocks.retain(|(h, _)| *h < block_height);
@@ -663,6 +683,8 @@ impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat
663683
async fn into_future(mut self, mut ixns: mpsc::Receiver<TxWatcher>) {
664684
'shutdown: loop {
665685
{
686+
self.update_pause_state();
687+
666688
let next_reap = self.next_reap();
667689
let sleep = std::pin::pin!(sleep_until(next_reap.into()));
668690

crates/provider/src/provider/root.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,9 @@ impl<N: Network> RootProvider<N> {
115115
pub(crate) fn get_heart(&self) -> &HeartbeatHandle {
116116
self.inner.heart.get_or_init(|| {
117117
let new_blocks = NewBlocks::<N>::new(self.inner.weak_client());
118+
let paused = new_blocks.paused.clone();
118119
let stream = new_blocks.into_stream();
119-
Heartbeat::<N, _>::new(Box::pin(stream)).spawn()
120+
Heartbeat::<N, _>::new(Box::pin(stream), paused).spawn()
120121
})
121122
}
122123
}

0 commit comments

Comments
 (0)