From 8a9d0eb75ce8680c6db9a64c0b3bcdba47ec4902 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 21 Oct 2021 14:21:54 +1000 Subject: [PATCH] Create an ActiveConnectionCounter type to manage connection tracking --- zebra-network/src/peer_set.rs | 2 + zebra-network/src/peer_set/initialize.rs | 168 ++++++++--------------- 2 files changed, 56 insertions(+), 114 deletions(-) diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index 072dcdb4f63..48b6058bfef 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -1,10 +1,12 @@ pub(crate) mod candidate_set; mod initialize; mod inventory_registry; +mod limit; mod set; mod unready_service; pub(crate) use candidate_set::CandidateSet; +pub(crate) use limit::ActiveConnectionCounter; pub(crate) use set::MorePeers; use inventory_registry::InventoryRegistry; diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 98cd55df5bb..27ba0606a9a 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -3,13 +3,7 @@ // Portions of this submodule were adapted from tower-balance, // which is (c) 2019 Tower Contributors (MIT licensed). -use std::{ - net::SocketAddr, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, -}; +use std::{net::SocketAddr, sync::Arc}; use futures::{ channel::mpsc, @@ -32,7 +26,7 @@ use crate::{ constants, meta_addr::MetaAddr, peer, - peer_set::{CandidateSet, MorePeers, PeerSet}, + peer_set::{ActiveConnectionCounter, CandidateSet, MorePeers, PeerSet}, timestamp_collector::TimestampCollector, AddressBook, BoxError, Config, Request, Response, }; @@ -45,17 +39,6 @@ mod tests; /// This result comes from the [`Handshaker`]. type PeerChange = Result, BoxError>; -/// A signal sent by a [`Connection`] when it closes. -/// -/// Used to count the number of open connections. -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -struct ConnectionClosed; - -/// The current number of peers connected to Zebra. -/// -/// TODO: replace with non-atomic usize in the inbound/outbound tasks in #2904 -static PEER_CONNECTION_COUNT: AtomicUsize = AtomicUsize::new(0); - /// Initialize a peer set, using a network `config`, `inbound_service`, /// and `latest_chain_tip`. /// @@ -126,9 +109,6 @@ where let (peerset_tx, peerset_rx) = mpsc::channel::(100); // Create an mpsc channel for peerset demand signaling. let (mut demand_tx, demand_rx) = mpsc::channel::(100); - // Create an mpsc channel to count open connections. - // This channel will be bounded by the connection limit (#1850, #1851, #2902). - let (connection_tx, mut connection_rx) = mpsc::unbounded::(); // Create a oneshot to send background task JoinHandles to the peer set let (handle_tx, handle_rx) = tokio::sync::oneshot::channel(); @@ -154,66 +134,42 @@ where // // 1. Incoming peer connections, via a listener. let listen_guard = tokio::spawn( - accept_inbound_connections( - tcp_listener, - listen_handshaker, - peerset_tx.clone(), - connection_tx.clone(), - ) - .instrument(Span::current()), + accept_inbound_connections(tcp_listener, listen_handshaker, peerset_tx.clone()) + .instrument(Span::current()), ); // 2. Initial peers, specified in the config. - let (initial_peer_count_tx, initial_peer_count_rx) = tokio::sync::oneshot::channel(); let initial_peers_fut = { let config = config.clone(); let outbound_connector = outbound_connector.clone(); let peerset_tx = peerset_tx.clone(); - let connection_tx = connection_tx.clone(); async move { let initial_peers = config.initial_peers().await; - let _ = initial_peer_count_tx.send(initial_peers.len()); - add_initial_peers(initial_peers, outbound_connector, peerset_tx, connection_tx).await + add_initial_peers(initial_peers, outbound_connector, peerset_tx).await } .boxed() }; - let add_guard = tokio::spawn(initial_peers_fut.instrument(Span::current())); + let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current())); // 3. Outgoing peers we connect to in response to load. let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone()); - // Keep the connection count up to date - // - // TODO: replace with checking the receiver in the inbound/outbound tasks in #2904 - let connection_count_fut = { - async move { - // if all the senders are closed, we're probably shutting down, so exit anyway - while let Some(ConnectionClosed) = connection_rx.next().await { - // A connection was closed. - let open_connections = PEER_CONNECTION_COUNT.fetch_sub(1, Ordering::SeqCst); - info!(?open_connections, "a peer connection closed"); - } - - let open_connections = PEER_CONNECTION_COUNT.load(Ordering::SeqCst); - info!(?open_connections, "stopping connection tracker"); - - Ok(()) - } - .boxed() - }; - - let connection_count_guard = tokio::spawn(connection_count_fut.instrument(Span::current())); + // Wait for the initial seed peer count + let mut active_outbound_connections = initial_peers_join + .await + .expect("unexpected panic in spawned initial peers task") + .expect("unexpected error connecting to initial peers"); + let initial_peer_count = active_outbound_connections.update_count(); // We need to await candidates.update() here, because zcashd only sends one // `addr` message per connection, and if we only have one initial peer we // need to ensure that its `addr` message is used by the crawler. info!("Sending initial request for peers"); - let _ = candidates - .update_initial(initial_peer_count_rx.await.expect("value sent before drop")) - .await; + let _ = candidates.update_initial(initial_peer_count).await; + // TODO: reduce demand by initial_peer_count (#2902) for _ in 0..config.peerset_initial_target_size { let _ = demand_tx.try_send(MorePeers); } @@ -226,34 +182,24 @@ where candidates, outbound_connector, peerset_tx, - connection_tx.clone(), + active_outbound_connections, ) .instrument(Span::current()), ); - handle_tx - .send(vec![ - add_guard, - listen_guard, - crawl_guard, - connection_count_guard, - ]) - .unwrap(); + handle_tx.send(vec![listen_guard, crawl_guard]).unwrap(); (peer_set, address_book) } /// Use the provided `handshaker` to connect to `initial_peers`, then send /// the results over `peerset_tx`. -/// -/// When a connection closes, send a close notification on `connection_tx`. -#[instrument(skip(initial_peers, outbound_connector, peerset_tx, connection_tx))] +#[instrument(skip(initial_peers, outbound_connector, peerset_tx))] async fn add_initial_peers( initial_peers: std::collections::HashSet, outbound_connector: S, mut peerset_tx: mpsc::Sender, - connection_tx: mpsc::UnboundedSender, -) -> Result<(), BoxError> +) -> Result where S: Service, Error = BoxError> + Clone, S::Future: Send + 'static, @@ -262,12 +208,16 @@ where let mut handshake_success_total: usize = 0; let mut handshake_error_total: usize = 0; + let mut active_outbound_connections = ActiveConnectionCounter::new_counter(); + info!( ?initial_peer_count, ?initial_peers, "connecting to initial peer set" ); + // TODO: send the connection trackers to the outbound connector + // # Security // // TODO: rate-limit initial seed peer connections (#2326) @@ -288,17 +238,14 @@ where }) .collect(); - // Increment the connection count before we concurrently open all the connections. - // - // TODO: replace with non-atomic usize in the inbound/outbound tasks in #2904 - let max_open_connections = - PEER_CONNECTION_COUNT.fetch_add(initial_peer_count, Ordering::SeqCst); - info!( - ?max_open_connections, - "opening connections to initial seed peers" - ); - while let Some(handshake_result) = handshakes.next().await { + // TODO: we have already opened the connection - track it before opening + let connection_tracker = active_outbound_connections.track_connection(); + debug!( + outbound_connections = ?active_outbound_connections.update_count(), + "opened an initial seed peer connection" + ); + match handshake_result { Ok(ref change) => { handshake_success_total += 1; @@ -311,10 +258,7 @@ where } Err((addr, ref e)) => { // The connection was never opened, or it failed the handshake and was dropped. - // - // We ignore disconnected errors, because the receiver task can shut down - // before some connections are dropped. - let _ = connection_tx.unbounded_send(ConnectionClosed); + std::mem::drop(connection_tracker); handshake_error_total += 1; // this is verbose, but it's better than just hanging with no output when there are errors @@ -333,15 +277,15 @@ where .await?; } - let open_connections = PEER_CONNECTION_COUNT.load(Ordering::SeqCst); + let outbound_connections = active_outbound_connections.update_count(); info!( ?handshake_success_total, ?handshake_error_total, - ?open_connections, + ?outbound_connections, "finished connecting to initial seed peers" ); - Ok(()) + Ok(active_outbound_connections) } /// Open a peer connection listener on `config.listen_addr`, @@ -401,27 +345,24 @@ async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) { /// /// Uses `handshaker` to perform a Zcash network protocol handshake, and sends /// the [`peer::Client`] result over `peerset_tx`. -/// -/// When a connection closes, send a close notification on `connection_tx`. -#[instrument(skip(listener, handshaker, peerset_tx, connection_tx), fields(listener_addr = ?listener.local_addr()))] +#[instrument(skip(listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))] async fn accept_inbound_connections( listener: TcpListener, mut handshaker: S, peerset_tx: mpsc::Sender, - connection_tx: mpsc::UnboundedSender, ) -> Result<(), BoxError> where S: Service + Clone, S::Future: Send + 'static, { + let mut active_inbound_connections = ActiveConnectionCounter::new_counter(); + loop { if let Ok((tcp_stream, addr)) = listener.accept().await { // The peer already opened a connection, so increment the connection count immediately. - // - // TODO: replace with non-atomic usize in the inbound/outbound tasks in #2904 - let open_connections = PEER_CONNECTION_COUNT.fetch_add(1, Ordering::SeqCst); + let connection_tracker = active_inbound_connections.track_connection(); info!( - ?open_connections, + inbound_connections = ?active_inbound_connections.update_count(), "handshaking on an open inbound peer connection" ); @@ -433,22 +374,21 @@ where handshaker.ready_and().await?; // TODO: distinguish between proxied listeners and direct listeners let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr); + + // TODO: send the connection tracker to the handshaker + // Construct a handshake future but do not drive it yet.... let handshake = handshaker.call((tcp_stream, connected_addr)); // ... instead, spawn a new task to handle this connection { let mut peerset_tx = peerset_tx.clone(); - let connection_tx = connection_tx.clone(); tokio::spawn( async move { if let Ok(client) = handshake.await { let _ = peerset_tx.send(Ok(Change::Insert(addr, client))).await; } else { // The connection failed the handshake and was dropped. - // - // We ignore disconnected errors, because the receiver task can shut down - // before some connections are dropped. - let _ = connection_tx.unbounded_send(ConnectionClosed); + std::mem::drop(connection_tracker); } } .instrument(handshaker_span), @@ -493,14 +433,15 @@ enum CrawlerAction { /// permanent internal error. Transient errors and individual peer errors should /// be handled within the crawler. /// -/// When a connection closes, send a close notification on `connection_tx`. +/// Uses `active_outbound_connections` to track active outbound connections +/// in both the initial peers and crawler. #[instrument(skip( demand_tx, demand_rx, candidates, outbound_connector, peerset_tx, - connection_tx + active_outbound_connections, ))] async fn crawl_and_dial( crawl_new_peer_interval: std::time::Duration, @@ -509,7 +450,7 @@ async fn crawl_and_dial( mut candidates: CandidateSet, outbound_connector: C, mut peerset_tx: mpsc::Sender, - connection_tx: mpsc::UnboundedSender, + mut active_outbound_connections: ActiveConnectionCounter, ) -> Result<(), BoxError> where C: Service, Error = BoxError> @@ -580,10 +521,13 @@ where } DemandHandshake { candidate } => { // Increment the connection count before we spawn the connection. - // - // TODO: replace with non-atomic usize in the inbound/outbound tasks in #2904 - let open_connections = PEER_CONNECTION_COUNT.fetch_add(1, Ordering::SeqCst); - info!(?open_connections, "opening an outbound peer connection"); + let _connection_tracker = active_outbound_connections.track_connection(); + info!( + outbound_connections = ?active_outbound_connections.update_count(), + "opening an outbound peer connection" + ); + + // TODO: send the connection tracker to the handshaker // Spawn each handshake into an independent task, so it can make // progress independently of the crawls. @@ -631,9 +575,7 @@ where HandshakeFailed { failed_addr } => { // The connection was never opened, or it failed the handshake and was dropped. // - // We ignore disconnected errors, because the receiver task can shut down - // before some connections are dropped. - let _ = connection_tx.unbounded_send(ConnectionClosed); + // TODO: drop the connection tracker here? debug!(?failed_addr.addr, "marking candidate as failed"); candidates.report_failed(&failed_addr); @@ -650,8 +592,6 @@ where /// /// Returns a `HandshakeConnected` action on success, and a /// `HandshakeFailed` action on error. -/// -/// When a connection closes, send a close notification on `connection_tx`. #[instrument(skip(outbound_connector))] async fn dial(candidate: MetaAddr, mut outbound_connector: C) -> CrawlerAction where