Skip to content

Commit

Permalink
refactor: restrict the usage of unbounded channels
Browse files Browse the repository at this point in the history
  • Loading branch information
rumenov committed Feb 20, 2024
1 parent f9ac12e commit 23cfc89
Show file tree
Hide file tree
Showing 24 changed files with 49 additions and 6 deletions.
14 changes: 10 additions & 4 deletions clippy.toml
@@ -1,7 +1,13 @@
too-many-arguments-threshold = 12
disallowed-methods = [
{ path = "bincode::deserialize_from" , reason = "bincode::deserialize_from() is not safe to use on untrusted data, since the method will read a u64 length value from the first 8 bytes of the serialized payload and will then attempt to allocate this number of bytes without any validation." },
{ path = "std::io::Write::write" , reason = "`Write::write()` may not write the entire buffer. Use `Write::write_all()` instead. Or, if you are intentionally using `Write::write()`, use `#[allow(clippy::disallowed_methods)]` to locally disable this check." },
{ path = "tokio::io::AsyncWriteExt::write" , reason = "`AsyncWriteExt::write()` may not write the entire buffer. Use `AsyncWriteExt::write_all()` instead. Or, if you are intentionally using `Write::write()`, use `#[allow(clippy::disallowed_methods)]` to locally disable this check." },
{ path = "tokio::task::block_in_place" , reason = "`block_in_place()` almost always signals that there is an issue with the overall design. Furthermore, `block_in_place()` panics unless the Tokio scheduler has enough available threads to move tasks. If you are intentionally using `block_in_place()`, use `#[allow(clippy::disallowed_methods)]` to locally disable this check." }
{ path = "bincode::deserialize_from", reason = "bincode::deserialize_from() is not safe to use on untrusted data, since the method will read a u64 length value from the first 8 bytes of the serialized payload and will then attempt to allocate this number of bytes without any validation." },
{ path = "std::io::Write::write", reason = "`Write::write()` may not write the entire buffer. Use `Write::write_all()` instead. Or, if you are intentionally using `Write::write()`, use `#[allow(clippy::disallowed_methods)]` to locally disable this check." },
{ path = "tokio::io::AsyncWriteExt::write", reason = "`AsyncWriteExt::write()` may not write the entire buffer. Use `AsyncWriteExt::write_all()` instead. Or, if you are intentionally using `Write::write()`, use `#[allow(clippy::disallowed_methods)]` to locally disable this check." },
{ path = "tokio::task::block_in_place", reason = "`block_in_place()` almost always signals that there is an issue with the overall design. Furthermore, `block_in_place()` panics unless the Tokio scheduler has enough available threads to move tasks. If you are intentionally using `block_in_place()`, use `#[allow(clippy::disallowed_methods)]` to locally disable this check." },
# unbounded channels are for expert use only
{ path = "tokio::sync::mpsc::unbounded_channel", reason = "Using an unbounded channel can lead to unbounded memory growth. Please use a bounded channel instead. If you are intentionally using an unbounded channel, use `#[allow(clippy::disallowed_methods)]` to locally disable this check." },
{ path = "futures::channel::mpsc::unbounded", reason = "Using an unbounded channel most likely will read to unbounded memory growth. Please use a bounded channel instead. If you are intentionally using unbounded channel, use `#[allow(clippy::disallowed_methods)]` to locally disable this check." },
{ path = "futures_channel::mpsc::unbounded", reason = "Using an unbounded channel most likely will read to unbounded memory growth. Please use a bounded channel instead. If you are intentionally using unbounded channel, use `#[allow(clippy::disallowed_methods)]` to locally disable this check." },
{ path = "crossbeam::channel::unbounded", reason = "Using an unbounded channel most likely will read to unbounded memory growth. Please use a bounded channel instead. If you are intentionally using unbounded channel, use `#[allow(clippy::disallowed_methods)]` to locally disable this check." },
{ path = "crossbeam_channel::unbounded", reason = "Using an unbounded channel most likely will read to unbounded memory growth. Please use a bounded channel instead. If you are intentionally using unbounded channel, use `#[allow(clippy::disallowed_methods)]` to locally disable this check." },
]
1 change: 1 addition & 0 deletions rs/artifact_manager/src/lib.rs
Expand Up @@ -239,6 +239,7 @@ where
// will result on slow consuption of chunks. Slow consumption of chunks will in turn
// result in slower consumptions of adverts. Ideally adverts are consumed at rate
// independent of consensus.
#[allow(clippy::disallowed_methods)]
let (sender, receiver) = unbounded_channel();
let shutdown = Arc::new(AtomicBool::new(false));

Expand Down
1 change: 1 addition & 0 deletions rs/bitcoin/adapter/src/connection.rs
Expand Up @@ -325,6 +325,7 @@ mod test {
let addr = SocketAddr::from_str("127.0.0.1:8333").expect("invalid string");
let address_entry = AddressEntry::Discovered(addr);
let handle = runtime.spawn(async {});
#[allow(clippy::disallowed_methods)]
let (writer, reader) = unbounded_channel();
(
Connection::new(ConnectionConfig {
Expand Down
6 changes: 6 additions & 0 deletions rs/bitcoin/adapter/src/connectionmanager.rs
Expand Up @@ -300,6 +300,7 @@ impl ConnectionManager {
if self.connections.contains_key(&address) {
return Err(ConnectionManagerError::AlreadyConnected(address));
}
#[allow(clippy::disallowed_methods)]
let (writer, network_message_receiver) = unbounded_channel();
let stream_event_sender = self.stream_event_sender.clone();
let network_message_sender = self.network_message_sender.clone();
Expand Down Expand Up @@ -944,6 +945,7 @@ mod test {
RouterMetrics::new(&MetricsRegistry::default()),
);
let timestamp = SystemTime::now() - Duration::from_secs(60);
#[allow(clippy::disallowed_methods)]
let (writer, _) = unbounded_channel();
runtime.block_on(async {
let conn = Connection::new_with_state(
Expand Down Expand Up @@ -989,6 +991,7 @@ mod test {
);
let timestamp1 = SystemTime::now() - Duration::from_secs(SEED_ADDR_RETRIEVED_TIMEOUT_SECS);
let timestamp2 = SystemTime::now() + Duration::from_secs(SEED_ADDR_RETRIEVED_TIMEOUT_SECS);
#[allow(clippy::disallowed_methods)]
let (writer, _) = unbounded_channel();
runtime.block_on(async {
let conn = Connection::new_with_state(
Expand Down Expand Up @@ -1058,6 +1061,7 @@ mod test {
);
let timestamp1 = SystemTime::now() - Duration::from_secs(SEED_ADDR_RETRIEVED_TIMEOUT_SECS);
let timestamp2 = SystemTime::now() + Duration::from_secs(SEED_ADDR_RETRIEVED_TIMEOUT_SECS);
#[allow(clippy::disallowed_methods)]
let (writer, _) = unbounded_channel();
runtime.block_on(async {
let conn = Connection::new_with_state(
Expand Down Expand Up @@ -1118,6 +1122,7 @@ mod test {
network_message_sender,
RouterMetrics::new(&MetricsRegistry::default()),
);
#[allow(clippy::disallowed_methods)]
let (writer, _) = unbounded_channel();
let conn = Connection::new_with_state(
ConnectionConfig {
Expand Down Expand Up @@ -1164,6 +1169,7 @@ mod test {
network_message_sender,
RouterMetrics::new(&MetricsRegistry::default()),
);
#[allow(clippy::disallowed_methods)]
let (writer, _) = unbounded_channel();
let conn = Connection::new_with_state(
ConnectionConfig {
Expand Down
4 changes: 4 additions & 0 deletions rs/bitcoin/adapter/src/stream.rs
Expand Up @@ -368,6 +368,7 @@ pub fn handle_stream(config: StreamConfig) -> tokio::task::JoinHandle<()> {

#[cfg(test)]
pub mod test {

use std::net::{IpAddr, Ipv4Addr};

use crate::common::DEFAULT_CHANNEL_BUFFER_SIZE;
Expand All @@ -377,6 +378,7 @@ pub mod test {
use ic_logger::replica_logger::no_op_logger;

/// Test that large messages get rejected and we disconnect as a consequence.
#[allow(clippy::disallowed_methods)]
#[tokio::test]
async fn read_huge_message_from_network() {
let network = Network::Bitcoin;
Expand Down Expand Up @@ -433,6 +435,7 @@ pub mod test {
async fn initialization_times_out_after_five_seconds() {
let network = Network::Bitcoin;
let (net_tx, _) = tokio::sync::mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
#[allow(clippy::disallowed_methods)]
let (_adapter_tx, adapter_rx) = tokio::sync::mpsc::unbounded_channel();
let (stream_tx, _) = tokio::sync::mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);

Expand Down Expand Up @@ -462,6 +465,7 @@ pub mod test {
async fn read_two_messages_at_size_boundary() {
let network = Network::Bitcoin;
let (net_tx, mut net_rx) = tokio::sync::mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
#[allow(clippy::disallowed_methods)]
let (_adapter_tx, adapter_rx) = tokio::sync::mpsc::unbounded_channel();
let (stream_tx, mut stream_rx) = tokio::sync::mpsc::channel(DEFAULT_CHANNEL_BUFFER_SIZE);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
Expand Down
Expand Up @@ -195,7 +195,7 @@ fn new_call_service(
// .returning(|| false);

let ingress_throttler = Arc::new(RwLock::new(ingress_pool_throttler));

#[allow(clippy::disallowed_methods)]
let (ingress_tx, _ingress_rx) = tokio::sync::mpsc::unbounded_channel();

let sig_verifier = Arc::new(temp_crypto_component_with_fake_registry(node_test_id(1)));
Expand Down
1 change: 1 addition & 0 deletions rs/http_endpoints/public/tests/common/mod.rs
Expand Up @@ -475,6 +475,7 @@ impl HttpEndpointBuilder {
let sig_verifier = Arc::new(temp_crypto_component_with_fake_registry(node_test_id(0)));
let crypto = Arc::new(CryptoReturningOk::default());

#[allow(clippy::disallowed_methods)]
let (ingress_tx, ingress_rx) = tokio::sync::mpsc::unbounded_channel();
let mut ingress_pool_throtller = MockIngressPoolThrottler::new();
ingress_pool_throtller
Expand Down
2 changes: 2 additions & 0 deletions rs/nns/governance/tests/interleaving_tests.rs
Expand Up @@ -43,6 +43,7 @@ fn test_cant_increase_dissolve_delay_while_disbursing() {

// We use channels to control how the disbursing and delay increase are
// interleaved
#[allow(clippy::disallowed_methods)]
let (tx, mut rx) = mpsc::unbounded::<LedgerControlMessage>();
// Once we're done with disbursing, we will need to manually close the above
// channel to terminate the test.
Expand Down Expand Up @@ -164,6 +165,7 @@ fn test_cant_interleave_calls_to_settle_neurons_fund() {
PolynomialMatchingFunction::new(total_nf_maturity_equivalent_icp_e8s).unwrap();

// We use channels to control how the cals are interleaved
#[allow(clippy::disallowed_methods)]
let (tx, mut rx) = mpsc::unbounded::<LedgerControlMessage>();
// Once we're done with the successful settle, we will need to manually close the above
// channel to terminate the test.
Expand Down
2 changes: 2 additions & 0 deletions rs/p2p/consensus_manager/src/receiver.rs
@@ -1,3 +1,5 @@
#![allow(clippy::disallowed_methods)]

use std::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::{Arc, RwLock},
Expand Down
2 changes: 2 additions & 0 deletions rs/p2p/consensus_manager/src/sender.rs
@@ -1,3 +1,5 @@
#![allow(clippy::disallowed_methods)]

use std::{
collections::{hash_map::Entry, HashMap},
panic,
Expand Down
2 changes: 2 additions & 0 deletions rs/p2p/memory_transport/src/lib.rs
Expand Up @@ -93,6 +93,7 @@ pub struct TransportRouter {
}

impl TransportRouter {
#[allow(clippy::disallowed_methods)]
pub fn new() -> Self {
let (router_req_tx, mut router_req_rx) =
unbounded_channel::<(Request<Bytes>, NodeId, oneshot::Sender<Response<Bytes>>)>();
Expand Down Expand Up @@ -133,6 +134,7 @@ impl TransportRouter {
) -> PeerTransport {
// It is fine to use unbounded channel since ingestion rate is limited by
// capacity and processing rate >> ingestion rate.
#[allow(clippy::disallowed_methods)]
let (rpc_tx, mut rpc_rx) =
unbounded_channel::<(Request<Bytes>, oneshot::Sender<Response<Bytes>>)>();
self.peers
Expand Down
1 change: 1 addition & 0 deletions rs/p2p/test_utils/src/turmoil.rs
Expand Up @@ -217,6 +217,7 @@ pub fn add_peer_manager_to_sim(
RegistryConsensusHandle,
) {
let (peer_manager_sender, mut peer_manager_receiver) = oneshot::channel();
#[allow(clippy::disallowed_methods)]
let (peer_manager_cmd_sender, mut peer_manager_cmd_receiver) = mpsc::unbounded_channel();
sim.client("peer-manager", async move {
let rt = tokio::runtime::Handle::current();
Expand Down
2 changes: 1 addition & 1 deletion rs/pocket_ic_server/src/state_api/state.rs
Expand Up @@ -125,7 +125,7 @@ where
let instances = RwLock::new(instances);

let sync_wait_time = self.sync_wait_time.unwrap_or(DEFAULT_SYNC_WAIT_DURATION);

#[allow(clippy::disallowed_methods)]
let (drop_sender, mut rx) = mpsc::unbounded_channel::<T>();
let drop_handle = ThreadBuilder::new()
.name("PocketIC GC Thread".into())
Expand Down
1 change: 1 addition & 0 deletions rs/sns/governance/tests/interleaving_tests.rs
Expand Up @@ -46,6 +46,7 @@ fn test_cant_increase_dissolve_delay_while_disbursing() {

// We use channels to control how the disbursing and delay increase are
// interleaved
#[allow(clippy::disallowed_methods)]
let (tx, mut rx) = mpsc::unbounded::<LedgerControlMessage>();

let canister_fixture = canister_fixture_builder
Expand Down
1 change: 1 addition & 0 deletions rs/sns/swap/tests/swap.rs
Expand Up @@ -2227,6 +2227,7 @@ fn test_finalize_swap_rejects_concurrent_calls() {
// to the finalize method will block on ledger calls, and continue only when messages are
// drained from the channel. We can use this technique to guarantee ordering of API calls
// across message blocks.
#[allow(clippy::disallowed_methods)]
let (sender_channel, mut receiver_channel) = mpsc::unbounded::<LedgerControlMessage>();

let mut clients = CanisterClients {
Expand Down
2 changes: 2 additions & 0 deletions rs/state_manager/src/checkpoint.rs
Expand Up @@ -83,6 +83,7 @@ pub(crate) fn make_checkpoint(
.make_checkpoint_step_duration
.with_label_values(&["tip_to_checkpoint"])
.start_timer();
#[allow(clippy::disallowed_methods)]
let (send, recv) = unbounded();
tip_channel
.send(TipRequest::TipToCheckpoint {
Expand All @@ -109,6 +110,7 @@ pub(crate) fn make_checkpoint(
.make_checkpoint_step_duration
.with_label_values(&["wait_for_reflinking"])
.start_timer();
#[allow(clippy::disallowed_methods)]
let (send, recv) = unbounded();
tip_channel.send(TipRequest::Wait { sender: send }).unwrap();
recv.recv().unwrap();
Expand Down
2 changes: 2 additions & 0 deletions rs/state_manager/src/lib.rs
Expand Up @@ -1332,6 +1332,7 @@ struct CreateCheckpointResult {

impl StateManagerImpl {
pub fn flush_tip_channel(&self) {
#[allow(clippy::disallowed_methods)]
let (sender, recv) = unbounded();
self.tip_channel
.send(TipRequest::Wait { sender })
Expand Down Expand Up @@ -1560,6 +1561,7 @@ impl StateManagerImpl {

let persist_metadata_guard = Arc::new(Mutex::new(()));

#[allow(clippy::disallowed_methods)]
let (deallocation_sender, deallocation_receiver) = unbounded();
let _deallocation_handle = JoinOnDrop::new(
std::thread::Builder::new()
Expand Down
1 change: 1 addition & 0 deletions rs/state_manager/src/tip.rs
Expand Up @@ -188,6 +188,7 @@ pub(crate) fn spawn_tip_thread(
metrics: StateManagerMetrics,
malicious_flags: MaliciousFlags,
) -> (JoinOnDrop<()>, Sender<TipRequest>) {
#[allow(clippy::disallowed_methods)]
let (tip_sender, tip_receiver) = unbounded();
let mut thread_pool = scoped_threadpool::Pool::new(NUMBER_OF_CHECKPOINT_THREADS);
let mut tip_state = TipState::ReadyForPageDeltas(Height::from(0));
Expand Down
1 change: 1 addition & 0 deletions rs/state_manager/tests/state_manager.rs
Expand Up @@ -355,6 +355,7 @@ fn merge_overhead() {
assert!(last_checkpoint_size(&env) / state_in_memory(&env) <= 2.5);
}

#[allow(clippy::disallowed_methods)]
#[test]
fn skipping_flushing_is_invisible_for_state() {
fn skips(env: &StateMachine) -> f64 {
Expand Down
2 changes: 2 additions & 0 deletions rs/tests/src/driver/subprocess_ipc.rs
Expand Up @@ -237,6 +237,7 @@ mod tests {
fn can_send_and_receive_messages() {
let rt = Runtime::new().expect("failed to create tokio runtime");
let sock_path = get_unique_sock_path();
#[allow(clippy::disallowed_methods)]
let (log_send, log_rcvr_chan) = unbounded();
let parent_drain = ParentDrain(log_send);
let parent_logger = Logger::root(parent_drain, o!());
Expand Down Expand Up @@ -281,6 +282,7 @@ mod tests {
let rt = Runtime::new().expect("failed to create tokio runtime");
let sock_path = get_unique_sock_path();

#[allow(clippy::disallowed_methods)]
let (log_send, _log_rcvr_chan) = unbounded();
let parent_drain = ParentDrain(log_send);
let parent_logger = Logger::root(parent_drain, o!());
Expand Down
1 change: 1 addition & 0 deletions rs/tests/src/driver/task.rs
Expand Up @@ -194,6 +194,7 @@ mod tests {
#[test]
fn uninterrupted_empty_task_sends_no_events() {
let expected_task_id = TaskId::Test("test-id".to_string());
#[allow(clippy::disallowed_methods)]
let (evt_send, evt_rcv) = unbounded();

let t = EmptyTask::new(expected_task_id);
Expand Down
1 change: 1 addition & 0 deletions rs/tests/src/driver/task_scheduler.rs
Expand Up @@ -39,6 +39,7 @@ pub struct TaskScheduler {
impl TaskScheduler {
#[allow(clippy::map_entry)]
pub fn execute(&mut self, dbg_keepalive: bool) {
#[allow(clippy::disallowed_methods)]
let (event_tx, event_rx) = crossbeam_channel::unbounded();
let log = &self.log;
self.action_graph.start();
Expand Down
2 changes: 2 additions & 0 deletions rs/tests/src/driver/timeout.rs
Expand Up @@ -86,6 +86,7 @@ mod tests {
let rt = create_rt();
let d = ms(1);
let expected_task_id = TaskId::Test("test-id".to_string());
#[allow(clippy::disallowed_methods)]
let (evt_send, evt_rcv) = unbounded();

let t = TimeoutTask::new(rt.handle().clone(), d, expected_task_id.clone());
Expand All @@ -103,6 +104,7 @@ mod tests {
let rt = create_rt();
let d = ms(2000);
let expected_task_id = TaskId::Test("test-id".to_string());
#[allow(clippy::disallowed_methods)]
let (evt_send, _evt_rcv) = unbounded();
let evt_send2 = evt_send.clone();

Expand Down
1 change: 1 addition & 0 deletions rs/tests/src/generic_workload_engine/engine.rs
Expand Up @@ -49,6 +49,7 @@ where
{
let log = self.log;
let futures_count = (self.rps * self.duration.as_secs_f64()).floor() as usize;
#[allow(clippy::disallowed_methods)]
let (fut_snd, mut fut_rcv) = tokio::sync::mpsc::unbounded_channel();
info!(
log,
Expand Down

0 comments on commit 23cfc89

Please sign in to comment.