From 4cdd12e2c49040a5d5050b7b749f15ce4d27cb5d Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 22 Oct 2021 07:36:42 +1000 Subject: [PATCH] Track the number of active inbound and outbound peer connections (#2912) * Count the number of active inbound and outbound peer connections And reduce the count when each connection fails. * Fix a comment typo Co-authored-by: Alfredo Garcia Co-authored-by: Alfredo Garcia --- zebra-network/src/isolated.rs | 19 ++- zebra-network/src/peer.rs | 7 +- zebra-network/src/peer/connection.rs | 38 ++++- zebra-network/src/peer/connector.rs | 38 ++++- zebra-network/src/peer/handshake.rs | 28 +++- zebra-network/src/peer_set.rs | 3 + zebra-network/src/peer_set/initialize.rs | 204 +++++++++++++++-------- zebra-network/src/peer_set/limit.rs | 125 ++++++++++++++ 8 files changed, 368 insertions(+), 94 deletions(-) create mode 100644 zebra-network/src/peer_set/limit.rs diff --git a/zebra-network/src/isolated.rs b/zebra-network/src/isolated.rs index 0fb5d2a09cb..1d941b856ac 100644 --- a/zebra-network/src/isolated.rs +++ b/zebra-network/src/isolated.rs @@ -16,7 +16,8 @@ use tower::{ use zebra_chain::chain_tip::NoChainTip; use crate::{ - peer::{self, ConnectedAddr}, + peer::{self, ConnectedAddr, HandshakeRequest}, + peer_set::ActiveConnectionCounter, BoxError, Config, Request, Response, }; @@ -62,11 +63,19 @@ pub fn connect_isolated( .finish() .expect("provided mandatory builder parameters"); - // Don't send any metadata about the connection + // Don't send or track any metadata about the connection let connected_addr = ConnectedAddr::new_isolated(); - - Oneshot::new(handshake, (conn, connected_addr)) - .map_ok(|client| BoxService::new(Wrapper(client))) + let connection_tracker = ActiveConnectionCounter::new_counter().track_connection(); + + Oneshot::new( + handshake, + HandshakeRequest { + tcp_stream: conn, + connected_addr, + connection_tracker, + }, + ) + .map_ok(|client| BoxService::new(Wrapper(client))) } // This can be deleted when a new version of Tower with map_err is released. diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index b87c1eb8202..083246c77b8 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -11,14 +11,11 @@ mod error; /// Performs peer handshakes. mod handshake; -use client::ClientRequest; -use client::ClientRequestReceiver; -use client::InProgressClientRequest; -use client::MustUseOneshotSender; +use client::{ClientRequest, ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender}; use error::ErrorSlot; pub use client::Client; pub use connection::Connection; -pub use connector::Connector; +pub use connector::{Connector, OutboundConnectorRequest}; pub use error::{HandshakeError, PeerError, SharedPeerError}; pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest}; diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index bc0aaa0208d..b6eae8b4fb4 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -26,6 +26,11 @@ use zebra_chain::{ use crate::{ constants, + peer::{ + ClientRequestReceiver, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError, + SharedPeerError, + }, + peer_set::ConnectionTracker, protocol::{ external::{types::Nonce, InventoryHash, Message}, internal::{Request, Response}, @@ -33,11 +38,6 @@ use crate::{ BoxError, }; -use super::{ - ClientRequestReceiver, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError, - SharedPeerError, -}; - #[derive(Debug)] pub(super) enum Handler { /// Indicates that the handler has finished processing the request. @@ -314,19 +314,41 @@ pub(super) enum State { /// The state associated with a peer connection. pub struct Connection { + /// The state of this connection's current request or response. pub(super) state: State, + /// A timeout for a client request. This is stored separately from /// State so that we can move the future out of it independently of /// other state handling. pub(super) request_timer: Option, + + /// The `inbound` service, used to answer requests from this connection's peer. pub(super) svc: S, - /// A `mpsc::Receiver` that converts its results to - /// `InProgressClientRequest` + + /// A channel that receives network requests from the rest of Zebra. + /// + /// This channel produces `InProgressClientRequest`s. pub(super) client_rx: ClientRequestReceiver, + /// A slot for an error shared between the Connection and the Client that uses it. pub(super) error_slot: ErrorSlot, - //pub(super) peer_rx: Rx, + + /// A channel for sending requests to the connected peer. pub(super) peer_tx: Tx, + + /// A connection tracker that reduces the open connection count when dropped. + /// Used to limit the number of open connections in Zebra. + /// + /// This field does nothing until it is dropped. + /// + /// # Security + /// + /// If this connection tracker or `Connection`s are leaked, + /// the number of active connections will appear higher than it actually is. + /// + /// Eventually, Zebra could stop making connections entirely. + #[allow(dead_code)] + pub(super) connection_tracker: ConnectionTracker, } impl Connection diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index e8ba6e4b6bc..7cf9a585dfb 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -12,9 +12,11 @@ use tracing_futures::Instrument; use zebra_chain::chain_tip::{ChainTip, NoChainTip}; -use crate::{BoxError, Request, Response}; - -use super::{Client, ConnectedAddr, Handshake}; +use crate::{ + peer::{Client, ConnectedAddr, Handshake, HandshakeRequest}, + peer_set::ConnectionTracker, + BoxError, Request, Response, +}; /// A wrapper around [`peer::Handshake`] that opens a TCP connection before /// forwarding to the inner handshake service. Writing this as its own @@ -37,7 +39,19 @@ impl Connector { } } -impl Service for Connector +/// A connector request. +/// Contains the information needed to make an outbound connection to the peer. +pub struct OutboundConnectorRequest { + /// The Zcash listener address of the peer. + pub addr: SocketAddr, + + /// A connection tracker that reduces the open connection count when dropped. + /// + /// Used to limit the number of open connections in Zebra. + pub connection_tracker: ConnectionTracker, +} + +impl Service for Connector where S: Service + Clone + Send + 'static, S::Future: Send, @@ -52,14 +66,26 @@ where Poll::Ready(Ok(())) } - fn call(&mut self, addr: SocketAddr) -> Self::Future { + fn call(&mut self, req: OutboundConnectorRequest) -> Self::Future { + let OutboundConnectorRequest { + addr, + connection_tracker, + }: OutboundConnectorRequest = req; + let mut hs = self.handshaker.clone(); let connected_addr = ConnectedAddr::new_outbound_direct(addr); let connector_span = info_span!("connector", peer = ?connected_addr); + async move { let stream = TcpStream::connect(addr).await?; hs.ready_and().await?; - let client = hs.call((stream, connected_addr)).await?; + let client = hs + .call(HandshakeRequest { + tcp_stream: stream, + connected_addr, + connection_tracker, + }) + .await?; Ok(Change::Insert(addr, client)) } .instrument(connector_span) diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index eb7d1304458..fda4493a2c8 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -29,6 +29,8 @@ use zebra_chain::{ use crate::{ constants, meta_addr::MetaAddrChange, + peer::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerError}, + peer_set::ConnectionTracker, protocol::{ external::{types::*, Codec, InventoryHash, Message}, internal::{Request, Response}, @@ -37,8 +39,6 @@ use crate::{ BoxError, Config, }; -use super::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerError}; - /// A [`Service`] that handshakes with a remote peer and constructs a /// client/server pair. /// @@ -658,7 +658,20 @@ pub async fn negotiate_version( Ok((remote_version, remote_services, remote_canonical_addr)) } -pub type HandshakeRequest = (TcpStream, ConnectedAddr); +/// A handshake request. +/// Contains the information needed to handshake with the peer. +pub struct HandshakeRequest { + /// The TCP connection to the peer. + pub tcp_stream: TcpStream, + + /// The address of the peer, and other related information. + pub connected_addr: ConnectedAddr, + + /// A connection tracker that reduces the open connection count when dropped. + /// + /// Used to limit the number of open connections in Zebra. + pub connection_tracker: ConnectionTracker, +} impl Service for Handshake where @@ -676,7 +689,11 @@ where } fn call(&mut self, req: HandshakeRequest) -> Self::Future { - let (tcp_stream, connected_addr) = req; + let HandshakeRequest { + tcp_stream, + connected_addr, + connection_tracker, + } = req; let negotiator_span = debug_span!("negotiator", peer = ?connected_addr); // set the peer connection span's parent to the global span, as it @@ -892,11 +909,12 @@ where use super::connection; let server = Connection { state: connection::State::AwaitingRequest, + request_timer: None, svc: inbound_service, client_rx: server_rx.into(), error_slot: slot, peer_tx, - request_timer: None, + connection_tracker, }; tokio::spawn( diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index c5c4da4358d..04a2476d488 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -1,10 +1,13 @@ pub(crate) mod candidate_set; mod initialize; mod inventory_registry; +mod limit; mod set; mod unready_service; pub(crate) use candidate_set::CandidateSet; +pub(crate) use limit::{ActiveConnectionCounter, ConnectionTracker}; + use inventory_registry::InventoryRegistry; use set::PeerSet; diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index d36f253f95d..df418f60485 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -20,20 +20,23 @@ use tower::{ use tracing::Span; use tracing_futures::Instrument; -use crate::{ - constants, meta_addr::MetaAddr, peer, timestamp_collector::TimestampCollector, AddressBook, - BoxError, Config, Request, Response, -}; - use zebra_chain::{chain_tip::ChainTip, parameters::Network}; -use super::{CandidateSet, PeerSet}; - -use peer::Client; +use crate::{ + constants, + meta_addr::MetaAddr, + peer::{self, HandshakeRequest, OutboundConnectorRequest}, + peer_set::{ActiveConnectionCounter, CandidateSet, ConnectionTracker, PeerSet}, + timestamp_collector::TimestampCollector, + AddressBook, BoxError, Config, Request, Response, +}; #[cfg(test)] mod tests; +/// The result of an outbound peer connection attempt or inbound connection handshake. +/// +/// This result comes from the [`Handshaker`]. type PeerChange = Result, BoxError>; /// Initialize a peer set, using a network `config`, `inbound_service`, @@ -125,6 +128,8 @@ where ); let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE); + // Connect peerset_tx to the 3 peer sources: + // // 1. Incoming peer connections, via a listener. let listen_guard = tokio::spawn( accept_inbound_connections(tcp_listener, listen_handshaker, peerset_tx.clone()) @@ -132,33 +137,38 @@ where ); // 2. Initial peers, specified in the config. - let (initial_peer_count_tx, initial_peer_count_rx) = tokio::sync::oneshot::channel(); let initial_peers_fut = { let config = config.clone(); let outbound_connector = outbound_connector.clone(); let peerset_tx = peerset_tx.clone(); async move { let initial_peers = config.initial_peers().await; - let _ = initial_peer_count_tx.send(initial_peers.len()); - // Connect the tx end to the 3 peer sources: add_initial_peers(initial_peers, outbound_connector, peerset_tx).await } .boxed() }; - let add_guard = tokio::spawn(initial_peers_fut.instrument(Span::current())); + let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current())); // 3. Outgoing peers we connect to in response to load. let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone()); + // Wait for the initial seed peer count + let mut active_outbound_connections = initial_peers_join + .await + .expect("unexpected panic in spawned initial peers task") + .expect("unexpected error connecting to initial peers"); + let active_initial_peer_count = active_outbound_connections.update_count(); + // We need to await candidates.update() here, because zcashd only sends one // `addr` message per connection, and if we only have one initial peer we // need to ensure that its `addr` message is used by the crawler. - info!("Sending initial request for peers"); - let _ = candidates - .update_initial(initial_peer_count_rx.await.expect("value sent before drop")) - .await; + info!( + ?active_initial_peer_count, + "sending initial request for peers" + ); + let _ = candidates.update_initial(active_initial_peer_count).await; for _ in 0..config.peerset_initial_target_size { let _ = demand_tx.try_send(()); @@ -172,33 +182,38 @@ where candidates, outbound_connector, peerset_tx, + active_outbound_connections, ) .instrument(Span::current()), ); - handle_tx - .send(vec![add_guard, listen_guard, crawl_guard]) - .unwrap(); + handle_tx.send(vec![listen_guard, crawl_guard]).unwrap(); (peer_set, address_book) } /// Use the provided `handshaker` to connect to `initial_peers`, then send -/// the results over `tx`. -#[instrument(skip(initial_peers, outbound_connector, tx))] +/// the results over `peerset_tx`. +#[instrument(skip(initial_peers, outbound_connector, peerset_tx))] async fn add_initial_peers( initial_peers: std::collections::HashSet, outbound_connector: S, - mut tx: mpsc::Sender, -) -> Result<(), BoxError> + mut peerset_tx: mpsc::Sender, +) -> Result where - S: Service, Error = BoxError> + Clone, + S: Service< + OutboundConnectorRequest, + Response = Change, + Error = BoxError, + > + Clone, S::Future: Send + 'static, { let initial_peer_count = initial_peers.len(); let mut handshake_success_total: usize = 0; let mut handshake_error_total: usize = 0; + let mut active_outbound_connections = ActiveConnectionCounter::new_counter(); + info!( ?initial_peer_count, ?initial_peers, @@ -218,9 +233,15 @@ where let mut handshakes: FuturesUnordered<_> = initial_peers .into_iter() .map(|addr| { + let connection_tracker = active_outbound_connections.track_connection(); + let req = OutboundConnectorRequest { + addr, + connection_tracker, + }; + outbound_connector .clone() - .oneshot(addr) + .oneshot(req) .map_err(move |e| (addr, e)) }) .collect(); @@ -237,8 +258,8 @@ where ); } Err((addr, ref e)) => { - // this is verbose, but it's better than just hanging with no output when there are errors handshake_error_total += 1; + // this is verbose, but it's better than just hanging with no output when there are errors info!( ?handshake_success_total, ?handshake_error_total, @@ -249,16 +270,20 @@ where } } - tx.send(handshake_result.map_err(|(_addr, e)| e)).await?; + peerset_tx + .send(handshake_result.map_err(|(_addr, e)| e)) + .await?; } + let outbound_connections = active_outbound_connections.update_count(); info!( ?handshake_success_total, ?handshake_error_total, + ?outbound_connections, "finished connecting to initial seed peers" ); - Ok(()) + Ok(active_outbound_connections) } /// Open a peer connection listener on `config.listen_addr`, @@ -317,19 +342,28 @@ async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) { /// Zcash peer. /// /// Uses `handshaker` to perform a Zcash network protocol handshake, and sends -/// the [`Client`][peer::Client] result over `tx`. -#[instrument(skip(listener, handshaker, tx), fields(listener_addr = ?listener.local_addr()))] +/// the [`peer::Client`] result over `peerset_tx`. +#[instrument(skip(listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))] async fn accept_inbound_connections( listener: TcpListener, mut handshaker: S, - tx: mpsc::Sender, + peerset_tx: mpsc::Sender, ) -> Result<(), BoxError> where S: Service + Clone, S::Future: Send + 'static, { + let mut active_inbound_connections = ActiveConnectionCounter::new_counter(); + loop { if let Ok((tcp_stream, addr)) = listener.accept().await { + // The peer already opened a connection, so increment the connection count immediately. + let connection_tracker = active_inbound_connections.track_connection(); + info!( + inbound_connections = ?active_inbound_connections.update_count(), + "handshaking on an open inbound peer connection" + ); + let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr); let accept_span = info_span!("listen_accept", peer = ?connected_addr); let _guard = accept_span.enter(); @@ -338,18 +372,25 @@ where handshaker.ready_and().await?; // TODO: distinguish between proxied listeners and direct listeners let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr); + // Construct a handshake future but do not drive it yet.... - let handshake = handshaker.call((tcp_stream, connected_addr)); + let handshake = handshaker.call(HandshakeRequest { + tcp_stream, + connected_addr, + connection_tracker, + }); // ... instead, spawn a new task to handle this connection - let mut tx2 = tx.clone(); - tokio::spawn( - async move { - if let Ok(client) = handshake.await { - let _ = tx2.send(Ok(Change::Insert(addr, client))).await; + { + let mut peerset_tx = peerset_tx.clone(); + tokio::spawn( + async move { + if let Ok(client) = handshake.await { + let _ = peerset_tx.send(Ok(Change::Insert(addr, client))).await; + } } - } - .instrument(handshaker_span), - ); + .instrument(handshaker_span), + ); + } } } } @@ -368,7 +409,7 @@ enum CrawlerAction { TimerCrawl { tick: Instant }, /// Handle a successfully connected handshake `peer_set_change`. HandshakeConnected { - peer_set_change: Change, + peer_set_change: Change, }, /// Handle a handshake failure to `failed_addr`. HandshakeFailed { failed_addr: MetaAddr }, @@ -376,7 +417,7 @@ enum CrawlerAction { /// Given a channel `demand_rx` that signals a need for new peers, try to find /// and connect to new peers, and send the resulting `peer::Client`s through the -/// `success_tx` channel. +/// `peerset_tx` channel. /// /// Crawl for new peers every `crawl_new_peer_interval`, and whenever there is /// demand, but no new peers in `candidates`. After crawling, try to connect to @@ -385,21 +426,28 @@ enum CrawlerAction { /// If a handshake fails, restore the unused demand signal by sending it to /// `demand_tx`. /// -/// The crawler terminates when `candidates.update()` or `success_tx` returns a +/// The crawler terminates when `candidates.update()` or `peerset_tx` returns a /// permanent internal error. Transient errors and individual peer errors should /// be handled within the crawler. -#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, success_tx))] +/// +/// Uses `active_outbound_connections` to track active outbound connections +/// in both the initial peers and crawler. +#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, peerset_tx,))] async fn crawl_and_dial( crawl_new_peer_interval: std::time::Duration, mut demand_tx: mpsc::Sender<()>, mut demand_rx: mpsc::Receiver<()>, mut candidates: CandidateSet, outbound_connector: C, - mut success_tx: mpsc::Sender, + mut peerset_tx: mpsc::Sender, + mut active_outbound_connections: ActiveConnectionCounter, ) -> Result<(), BoxError> where - C: Service, Error = BoxError> - + Clone + C: Service< + OutboundConnectorRequest, + Response = Change, + Error = BoxError, + > + Clone + Send + 'static, C::Future: Send + 'static, @@ -465,16 +513,27 @@ where continue; } DemandHandshake { candidate } => { - // spawn each handshake into an independent task, so it can make - // progress independently of the crawls - let hs_join = tokio::spawn(dial(candidate, outbound_connector.clone())) - .map(move |res| match res { - Ok(crawler_action) => crawler_action, - Err(e) => { - panic!("panic during handshaking with {:?}: {:?} ", candidate, e); - } - }) - .instrument(Span::current()); + // Increment the connection count before we spawn the connection. + let outbound_connection_tracker = active_outbound_connections.track_connection(); + info!( + outbound_connections = ?active_outbound_connections.update_count(), + "opening an outbound peer connection" + ); + + // Spawn each handshake into an independent task, so it can make + // progress independently of the crawls. + let hs_join = tokio::spawn(dial( + candidate, + outbound_connector.clone(), + outbound_connection_tracker, + )) + .map(move |res| match res { + Ok(crawler_action) => crawler_action, + Err(e) => { + panic!("panic during handshaking with {:?}: {:?} ", candidate, e); + } + }) + .instrument(Span::current()); handshakes.push(Box::pin(hs_join)); } DemandCrawl => { @@ -506,9 +565,11 @@ where } // successes are handled by an independent task, so they // shouldn't hang - success_tx.send(Ok(peer_set_change)).await?; + peerset_tx.send(Ok(peer_set_change)).await?; } HandshakeFailed { failed_addr } => { + // The connection was never opened, or it failed the handshake and was dropped. + debug!(?failed_addr.addr, "marking candidate as failed"); candidates.report_failed(&failed_addr); // The demand signal that was taken out of the queue @@ -521,14 +582,22 @@ where } /// Try to connect to `candidate` using `outbound_connector`. +/// Uses `outbound_connection_tracker` to track the active connection count. /// /// Returns a `HandshakeConnected` action on success, and a /// `HandshakeFailed` action on error. -#[instrument(skip(outbound_connector,))] -async fn dial(candidate: MetaAddr, mut outbound_connector: C) -> CrawlerAction +#[instrument(skip(outbound_connector, outbound_connection_tracker))] +async fn dial( + candidate: MetaAddr, + mut outbound_connector: C, + outbound_connection_tracker: ConnectionTracker, +) -> CrawlerAction where - C: Service, Error = BoxError> - + Clone + C: Service< + OutboundConnectorRequest, + Response = Change, + Error = BoxError, + > + Clone + Send + 'static, C::Future: Send + 'static, @@ -547,16 +616,21 @@ where .await .expect("outbound connector never errors"); + let req = OutboundConnectorRequest { + addr: candidate.addr, + connection_tracker: outbound_connection_tracker, + }; + // the handshake has timeouts, so it shouldn't hang outbound_connector - .call(candidate.addr) + .call(req) .map_err(|e| (candidate, e)) .map(Into::into) .await } -impl From, (MetaAddr, BoxError)>> for CrawlerAction { - fn from(dial_result: Result, (MetaAddr, BoxError)>) -> Self { +impl From, (MetaAddr, BoxError)>> for CrawlerAction { + fn from(dial_result: Result, (MetaAddr, BoxError)>) -> Self { use CrawlerAction::*; match dial_result { Ok(peer_set_change) => HandshakeConnected { peer_set_change }, diff --git a/zebra-network/src/peer_set/limit.rs b/zebra-network/src/peer_set/limit.rs new file mode 100644 index 00000000000..0d2ff1afe0a --- /dev/null +++ b/zebra-network/src/peer_set/limit.rs @@ -0,0 +1,125 @@ +//! Counting active connections used by Zebra. +//! +//! These types can be used to count any kind of active resource. +//! But they are currently used to track the number of open connections. + +use std::fmt; + +use tokio::sync::mpsc; + +/// A signal sent by a [`Connection`] when it closes. +/// +/// Used to count the number of open connections. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub struct ConnectionClosed; + +/// A counter for active connections. +/// +/// Creates a [`ConnectionTracker`] to track each active connection. +/// When these trackers are dropped, the counter gets notified. +pub struct ActiveConnectionCounter { + /// The number of active peers tracked using this counter. + count: usize, + + /// The channel used to send closed connection notifications. + close_notification_tx: mpsc::UnboundedSender, + + /// The channel used to receive closed connection notifications. + close_notification_rx: mpsc::UnboundedReceiver, +} + +impl fmt::Debug for ActiveConnectionCounter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ActiveConnectionCounter") + .field("count", &self.count) + .finish() + } +} + +impl ActiveConnectionCounter { + /// Create and return a new active connection counter. + pub fn new_counter() -> Self { + // TODO: This channel will be bounded by the connection limit (#1850, #1851, #2902). + let (close_notification_tx, close_notification_rx) = mpsc::unbounded_channel(); + + Self { + count: 0, + close_notification_rx, + close_notification_tx, + } + } + + /// Create and return a new [`ConnectionTracker`], and add 1 to this counter. + /// + /// When the returned tracker is dropped, this counter will be notified, and decreased by 1. + pub fn track_connection(&mut self) -> ConnectionTracker { + ConnectionTracker::new(self) + } + + /// Check for closed connection notifications, and return the current connection count. + pub fn update_count(&mut self) -> usize { + let previous_connections = self.count; + + // We ignore errors here: + // - TryRecvError::Empty means that there are no pending close notifications + // - TryRecvError::Closed is unreachable, because we hold a sender + while let Ok(ConnectionClosed) = self.close_notification_rx.try_recv() { + self.count -= 1; + + debug!( + open_connections = ?self.count, + ?previous_connections, + "a peer connection was closed" + ); + } + + debug!( + open_connections = ?self.count, + ?previous_connections, + "updated active connection count" + ); + + self.count + } +} + +/// A per-connection tracker. +/// +/// [`ActiveConnectionCounter`] creates a tracker instance for each active connection. +/// When these trackers are dropped, the counter gets notified. +pub struct ConnectionTracker { + /// The channel used to send closed connection notifications on drop. + close_notification_tx: mpsc::UnboundedSender, +} + +impl fmt::Debug for ConnectionTracker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ConnectionTracker").finish() + } +} + +impl ConnectionTracker { + /// Create and return a new active connection tracker, and add 1 to `counter`. + /// + /// When the returned tracker is dropped, `counter` will be notified, and decreased by 1. + fn new(counter: &mut ActiveConnectionCounter) -> Self { + counter.count += 1; + + info!(open_connections = ?counter.count, "opening a new peer connection"); + + Self { + close_notification_tx: counter.close_notification_tx.clone(), + } + } +} + +impl Drop for ConnectionTracker { + /// Notifies the corresponding connection counter that the connection has closed. + fn drop(&mut self) { + // We ignore disconnected errors, because the receiver can be dropped + // before some connections are dropped. + // + // TODO: This channel will be bounded by the connection limit (#1850, #1851, #2902). + let _ = self.close_notification_tx.send(ConnectionClosed); + } +}