Skip to content

Commit 218fdbc

Browse files
authored
fix: remove the idle future (#2285)
the "idle" method was fairly complex, and it had a few bugs: - if it was called before any transactions happened, it would not return immediately, instead it would wait for "idle_sec" (because the last_time defaults to now()). - If the last transaction time happened more than "idle_sec" ago, it would panic because the u64 subtraction would underflow. It was used to notify the main loop that it needs to break. This can now be a simple check after each iteration. These are both very niche cases and almost impossible to happen, from the way the method is used; but leaving it increases the risk of misusing the method in the future.
1 parent 71a70ed commit 218fdbc

File tree

2 files changed

+47
-75
lines changed

2 files changed

+47
-75
lines changed

rs/bitcoin/adapter/src/lib.rs

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,12 @@
77
use bitcoin::{network::message::NetworkMessage, BlockHash, BlockHeader};
88
use ic_logger::ReplicaLogger;
99
use ic_metrics::MetricsRegistry;
10-
use std::time::Duration;
1110
use std::{
1211
net::SocketAddr,
1312
sync::{Arc, Mutex},
1413
time::Instant,
1514
};
16-
use tokio::{
17-
select,
18-
sync::{mpsc::channel, watch},
19-
time::sleep,
20-
};
15+
use tokio::sync::{mpsc::channel, watch};
2116
/// This module contains the AddressManager struct. The struct stores addresses
2217
/// that will be used to create new connections. It also tracks addresses that
2318
/// are in current use to encourage use from non-utilized addresses.
@@ -178,24 +173,6 @@ impl AdapterState {
178173
)
179174
}
180175

181-
/// A future that returns when/if the adapter becomes/is idle.
182-
pub async fn idle(&mut self) {
183-
let mut last_time = self
184-
.last_received_rx
185-
.borrow_and_update()
186-
.unwrap_or_else(Instant::now);
187-
188-
loop {
189-
let seconds_left_until_idle = self.idle_seconds - last_time.elapsed().as_secs();
190-
select! {
191-
_ = sleep(Duration::from_secs(seconds_left_until_idle)) => {return},
192-
Ok(_) = self.last_received_rx.changed() => {
193-
last_time = self.last_received_rx.borrow_and_update().unwrap_or_else(Instant::now);
194-
}
195-
}
196-
}
197-
}
198-
199176
/// A future that returns when/if the adapter becomes/is awake.
200177
pub async fn active(&mut self) {
201178
let _ = self

rs/bitcoin/adapter/src/router.rs

Lines changed: 46 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -59,61 +59,56 @@ pub fn start_main_event_loop(
5959

6060
// We do a select over tokio::sync::mpsc::Receiver::recv, tokio::sync::mpsc::UnboundedReceiver::recv,
6161
// tokio::time::Interval::tick which are all cancellation safe.
62-
loop {
63-
tokio::select! {
64-
_ = adapter_state.idle() => {
65-
break;
66-
},
67-
event = connection_manager.receive_stream_event() => {
68-
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) =
69-
connection_manager.process_event(&event)
70-
{
71-
connection_manager.discard(&event.address);
72-
}
73-
},
74-
network_message = network_message_receiver.recv() => {
75-
let (address, message) = network_message.unwrap();
76-
router_metrics
77-
.bitcoin_messages_received
78-
.with_label_values(&[message.cmd()])
79-
.inc();
80-
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) =
81-
connection_manager.process_bitcoin_network_message(address, &message) {
82-
connection_manager.discard(&address);
83-
}
62+
tokio::select! {
63+
event = connection_manager.receive_stream_event() => {
64+
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) =
65+
connection_manager.process_event(&event)
66+
{
67+
connection_manager.discard(&event.address);
68+
}
69+
},
70+
network_message = network_message_receiver.recv() => {
71+
let (address, message) = network_message.unwrap();
72+
router_metrics
73+
.bitcoin_messages_received
74+
.with_label_values(&[message.cmd()])
75+
.inc();
76+
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) =
77+
connection_manager.process_bitcoin_network_message(address, &message) {
78+
connection_manager.discard(&address);
79+
}
8480

85-
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = blockchain_manager.process_bitcoin_network_message(&mut connection_manager, address, &message) {
86-
connection_manager.discard(&address);
87-
}
88-
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = transaction_manager.process_bitcoin_network_message(&mut connection_manager, address, &message) {
89-
connection_manager.discard(&address);
90-
}
91-
},
92-
result = blockchain_manager_rx.recv() => {
93-
let command = result.expect("Receiving should not fail because the sender part of the channel is never closed.");
94-
match command {
95-
BlockchainManagerRequest::EnqueueNewBlocksToDownload(next_headers) => {
96-
blockchain_manager.enqueue_new_blocks_to_download(next_headers);
97-
}
98-
BlockchainManagerRequest::PruneBlocks(anchor, processed_block_hashes) => {
99-
blockchain_manager.prune_blocks(anchor, processed_block_hashes);
100-
}
101-
};
81+
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = blockchain_manager.process_bitcoin_network_message(&mut connection_manager, address, &message) {
82+
connection_manager.discard(&address);
83+
}
84+
if let Err(ProcessBitcoinNetworkMessageError::InvalidMessage) = transaction_manager.process_bitcoin_network_message(&mut connection_manager, address, &message) {
85+
connection_manager.discard(&address);
10286
}
103-
transaction_manager_request = transaction_manager_rx.recv() => {
104-
match transaction_manager_request.unwrap() {
105-
TransactionManagerRequest::SendTransaction(transaction) => transaction_manager.enqueue_transaction(&transaction),
87+
},
88+
result = blockchain_manager_rx.recv() => {
89+
let command = result.expect("Receiving should not fail because the sender part of the channel is never closed.");
90+
match command {
91+
BlockchainManagerRequest::EnqueueNewBlocksToDownload(next_headers) => {
92+
blockchain_manager.enqueue_new_blocks_to_download(next_headers);
10693
}
107-
},
108-
_ = tick_interval.tick() => {
109-
// After an event is dispatched, the managers `tick` method is called to process possible
110-
// outgoing messages.
111-
connection_manager.tick(blockchain_manager.get_height(), handle_stream);
112-
blockchain_manager.tick(&mut connection_manager);
113-
transaction_manager.advertise_txids(&mut connection_manager);
94+
BlockchainManagerRequest::PruneBlocks(anchor, processed_block_hashes) => {
95+
blockchain_manager.prune_blocks(anchor, processed_block_hashes);
96+
}
97+
};
98+
}
99+
transaction_manager_request = transaction_manager_rx.recv() => {
100+
match transaction_manager_request.unwrap() {
101+
TransactionManagerRequest::SendTransaction(transaction) => transaction_manager.enqueue_transaction(&transaction),
114102
}
115-
};
116-
}
103+
},
104+
_ = tick_interval.tick() => {
105+
// After an event is dispatched, the managers `tick` method is called to process possible
106+
// outgoing messages.
107+
connection_manager.tick(blockchain_manager.get_height(), handle_stream);
108+
blockchain_manager.tick(&mut connection_manager);
109+
transaction_manager.advertise_txids(&mut connection_manager);
110+
}
111+
};
117112
}
118113
});
119114
}

0 commit comments

Comments
 (0)