Skip to content

Commit

Permalink
Track the number of active inbound and outbound peer connections (#2912)
Browse files Browse the repository at this point in the history
* Count the number of active inbound and outbound peer connections

And reduce the count when each connection fails.

* Fix a comment typo

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
  • Loading branch information
teor2345 and oxarbitrage committed Oct 21, 2021
1 parent 86d05c5 commit 4cdd12e
Show file tree
Hide file tree
Showing 8 changed files with 368 additions and 94 deletions.
19 changes: 14 additions & 5 deletions zebra-network/src/isolated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use tower::{
use zebra_chain::chain_tip::NoChainTip;

use crate::{
peer::{self, ConnectedAddr},
peer::{self, ConnectedAddr, HandshakeRequest},
peer_set::ActiveConnectionCounter,
BoxError, Config, Request, Response,
};

Expand Down Expand Up @@ -62,11 +63,19 @@ pub fn connect_isolated(
.finish()
.expect("provided mandatory builder parameters");

// Don't send any metadata about the connection
// Don't send or track any metadata about the connection
let connected_addr = ConnectedAddr::new_isolated();

Oneshot::new(handshake, (conn, connected_addr))
.map_ok(|client| BoxService::new(Wrapper(client)))
let connection_tracker = ActiveConnectionCounter::new_counter().track_connection();

Oneshot::new(
handshake,
HandshakeRequest {
tcp_stream: conn,
connected_addr,
connection_tracker,
},
)
.map_ok(|client| BoxService::new(Wrapper(client)))
}

// This can be deleted when a new version of Tower with map_err is released.
Expand Down
7 changes: 2 additions & 5 deletions zebra-network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,11 @@ mod error;
/// Performs peer handshakes.
mod handshake;

use client::ClientRequest;
use client::ClientRequestReceiver;
use client::InProgressClientRequest;
use client::MustUseOneshotSender;
use client::{ClientRequest, ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};
use error::ErrorSlot;

pub use client::Client;
pub use connection::Connection;
pub use connector::Connector;
pub use connector::{Connector, OutboundConnectorRequest};
pub use error::{HandshakeError, PeerError, SharedPeerError};
pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest};
38 changes: 30 additions & 8 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ use zebra_chain::{

use crate::{
constants,
peer::{
ClientRequestReceiver, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError,
SharedPeerError,
},
peer_set::ConnectionTracker,
protocol::{
external::{types::Nonce, InventoryHash, Message},
internal::{Request, Response},
},
BoxError,
};

use super::{
ClientRequestReceiver, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError,
SharedPeerError,
};

#[derive(Debug)]
pub(super) enum Handler {
/// Indicates that the handler has finished processing the request.
Expand Down Expand Up @@ -314,19 +314,41 @@ pub(super) enum State {

/// The state associated with a peer connection.
pub struct Connection<S, Tx> {
/// The state of this connection's current request or response.
pub(super) state: State,

/// A timeout for a client request. This is stored separately from
/// State so that we can move the future out of it independently of
/// other state handling.
pub(super) request_timer: Option<Sleep>,

/// The `inbound` service, used to answer requests from this connection's peer.
pub(super) svc: S,
/// A `mpsc::Receiver<ClientRequest>` that converts its results to
/// `InProgressClientRequest`

/// A channel that receives network requests from the rest of Zebra.
///
/// This channel produces `InProgressClientRequest`s.
pub(super) client_rx: ClientRequestReceiver,

/// A slot for an error shared between the Connection and the Client that uses it.
pub(super) error_slot: ErrorSlot,
//pub(super) peer_rx: Rx,

/// A channel for sending requests to the connected peer.
pub(super) peer_tx: Tx,

/// A connection tracker that reduces the open connection count when dropped.
/// Used to limit the number of open connections in Zebra.
///
/// This field does nothing until it is dropped.
///
/// # Security
///
/// If this connection tracker or `Connection`s are leaked,
/// the number of active connections will appear higher than it actually is.
///
/// Eventually, Zebra could stop making connections entirely.
#[allow(dead_code)]
pub(super) connection_tracker: ConnectionTracker,
}

impl<S, Tx> Connection<S, Tx>
Expand Down
38 changes: 32 additions & 6 deletions zebra-network/src/peer/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ use tracing_futures::Instrument;

use zebra_chain::chain_tip::{ChainTip, NoChainTip};

use crate::{BoxError, Request, Response};

use super::{Client, ConnectedAddr, Handshake};
use crate::{
peer::{Client, ConnectedAddr, Handshake, HandshakeRequest},
peer_set::ConnectionTracker,
BoxError, Request, Response,
};

/// A wrapper around [`peer::Handshake`] that opens a TCP connection before
/// forwarding to the inner handshake service. Writing this as its own
Expand All @@ -37,7 +39,19 @@ impl<S, C> Connector<S, C> {
}
}

impl<S, C> Service<SocketAddr> for Connector<S, C>
/// A connector request.
/// Contains the information needed to make an outbound connection to the peer.
pub struct OutboundConnectorRequest {
/// The Zcash listener address of the peer.
pub addr: SocketAddr,

/// A connection tracker that reduces the open connection count when dropped.
///
/// Used to limit the number of open connections in Zebra.
pub connection_tracker: ConnectionTracker,
}

impl<S, C> Service<OutboundConnectorRequest> for Connector<S, C>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
Expand All @@ -52,14 +66,26 @@ where
Poll::Ready(Ok(()))
}

fn call(&mut self, addr: SocketAddr) -> Self::Future {
fn call(&mut self, req: OutboundConnectorRequest) -> Self::Future {
let OutboundConnectorRequest {
addr,
connection_tracker,
}: OutboundConnectorRequest = req;

let mut hs = self.handshaker.clone();
let connected_addr = ConnectedAddr::new_outbound_direct(addr);
let connector_span = info_span!("connector", peer = ?connected_addr);

async move {
let stream = TcpStream::connect(addr).await?;
hs.ready_and().await?;
let client = hs.call((stream, connected_addr)).await?;
let client = hs
.call(HandshakeRequest {
tcp_stream: stream,
connected_addr,
connection_tracker,
})
.await?;
Ok(Change::Insert(addr, client))
}
.instrument(connector_span)
Expand Down
28 changes: 23 additions & 5 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use zebra_chain::{
use crate::{
constants,
meta_addr::MetaAddrChange,
peer::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerError},
peer_set::ConnectionTracker,
protocol::{
external::{types::*, Codec, InventoryHash, Message},
internal::{Request, Response},
Expand All @@ -37,8 +39,6 @@ use crate::{
BoxError, Config,
};

use super::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerError};

/// A [`Service`] that handshakes with a remote peer and constructs a
/// client/server pair.
///
Expand Down Expand Up @@ -658,7 +658,20 @@ pub async fn negotiate_version(
Ok((remote_version, remote_services, remote_canonical_addr))
}

pub type HandshakeRequest = (TcpStream, ConnectedAddr);
/// A handshake request.
/// Contains the information needed to handshake with the peer.
pub struct HandshakeRequest {
/// The TCP connection to the peer.
pub tcp_stream: TcpStream,

/// The address of the peer, and other related information.
pub connected_addr: ConnectedAddr,

/// A connection tracker that reduces the open connection count when dropped.
///
/// Used to limit the number of open connections in Zebra.
pub connection_tracker: ConnectionTracker,
}

impl<S, C> Service<HandshakeRequest> for Handshake<S, C>
where
Expand All @@ -676,7 +689,11 @@ where
}

fn call(&mut self, req: HandshakeRequest) -> Self::Future {
let (tcp_stream, connected_addr) = req;
let HandshakeRequest {
tcp_stream,
connected_addr,
connection_tracker,
} = req;

let negotiator_span = debug_span!("negotiator", peer = ?connected_addr);
// set the peer connection span's parent to the global span, as it
Expand Down Expand Up @@ -892,11 +909,12 @@ where
use super::connection;
let server = Connection {
state: connection::State::AwaitingRequest,
request_timer: None,
svc: inbound_service,
client_rx: server_rx.into(),
error_slot: slot,
peer_tx,
request_timer: None,
connection_tracker,
};

tokio::spawn(
Expand Down
3 changes: 3 additions & 0 deletions zebra-network/src/peer_set.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
pub(crate) mod candidate_set;
mod initialize;
mod inventory_registry;
mod limit;
mod set;
mod unready_service;

pub(crate) use candidate_set::CandidateSet;
pub(crate) use limit::{ActiveConnectionCounter, ConnectionTracker};

use inventory_registry::InventoryRegistry;
use set::PeerSet;

Expand Down
Loading

0 comments on commit 4cdd12e

Please sign in to comment.