Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track the number of active inbound and outbound peer connections #2912

Merged
merged 4 commits into from
Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions zebra-network/src/isolated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use tower::{
use zebra_chain::chain_tip::NoChainTip;

use crate::{
peer::{self, ConnectedAddr},
peer::{self, ConnectedAddr, HandshakeRequest},
peer_set::ActiveConnectionCounter,
BoxError, Config, Request, Response,
};

Expand Down Expand Up @@ -62,11 +63,19 @@ pub fn connect_isolated(
.finish()
.expect("provided mandatory builder parameters");

// Don't send any metadata about the connection
// Don't send or track any metadata about the connection
let connected_addr = ConnectedAddr::new_isolated();

Oneshot::new(handshake, (conn, connected_addr))
.map_ok(|client| BoxService::new(Wrapper(client)))
let connection_tracker = ActiveConnectionCounter::new_counter().track_connection();

Oneshot::new(
handshake,
HandshakeRequest {
tcp_stream: conn,
connected_addr,
connection_tracker,
},
)
.map_ok(|client| BoxService::new(Wrapper(client)))
}

// This can be deleted when a new version of Tower with map_err is released.
Expand Down
7 changes: 2 additions & 5 deletions zebra-network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,11 @@ mod error;
/// Performs peer handshakes.
mod handshake;

use client::ClientRequest;
use client::ClientRequestReceiver;
use client::InProgressClientRequest;
use client::MustUseOneshotSender;
use client::{ClientRequest, ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};
use error::ErrorSlot;

pub use client::Client;
pub use connection::Connection;
pub use connector::Connector;
pub use connector::{Connector, OutboundConnectorRequest};
pub use error::{HandshakeError, PeerError, SharedPeerError};
pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest};
38 changes: 30 additions & 8 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ use zebra_chain::{

use crate::{
constants,
peer::{
ClientRequestReceiver, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError,
SharedPeerError,
},
peer_set::ConnectionTracker,
protocol::{
external::{types::Nonce, InventoryHash, Message},
internal::{Request, Response},
},
BoxError,
};

use super::{
ClientRequestReceiver, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError,
SharedPeerError,
};

#[derive(Debug)]
pub(super) enum Handler {
/// Indicates that the handler has finished processing the request.
Expand Down Expand Up @@ -314,19 +314,41 @@ pub(super) enum State {

/// The state associated with a peer connection.
pub struct Connection<S, Tx> {
/// The state of this connection's current request or response.
pub(super) state: State,

/// A timeout for a client request. This is stored separately from
/// State so that we can move the future out of it independently of
/// other state handling.
pub(super) request_timer: Option<Sleep>,

/// The `inbound` service, used to answer requests from this connection's peer.
pub(super) svc: S,
/// A `mpsc::Receiver<ClientRequest>` that converts its results to
/// `InProgressClientRequest`

/// A channel that receives network requests from the rest of Zebra.
///
/// This channel produces `InProgressClientRequest`s.
pub(super) client_rx: ClientRequestReceiver,

/// A slot for an error shared between the Connection and the Client that uses it.
pub(super) error_slot: ErrorSlot,
//pub(super) peer_rx: Rx,

/// A channel for sending requests to the connected peer.
pub(super) peer_tx: Tx,

/// A connection tracker that reduces the open connection count when dropped.
/// Used to limit the number of open connections in Zebra.
///
/// This field does nothing until it is dropped.
///
/// # Security
///
/// If this connection tracker or `Connection`s are leaked,
/// the number of active connections will appear higher than it actually is.
///
/// Eventually, Zebra could stop making connections entirely.
#[allow(dead_code)]
pub(super) connection_tracker: ConnectionTracker,
}

impl<S, Tx> Connection<S, Tx>
Expand Down
38 changes: 32 additions & 6 deletions zebra-network/src/peer/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ use tracing_futures::Instrument;

use zebra_chain::chain_tip::{ChainTip, NoChainTip};

use crate::{BoxError, Request, Response};

use super::{Client, ConnectedAddr, Handshake};
use crate::{
peer::{Client, ConnectedAddr, Handshake, HandshakeRequest},
peer_set::ConnectionTracker,
BoxError, Request, Response,
};

/// A wrapper around [`peer::Handshake`] that opens a TCP connection before
/// forwarding to the inner handshake service. Writing this as its own
Expand All @@ -37,7 +39,19 @@ impl<S, C> Connector<S, C> {
}
}

impl<S, C> Service<SocketAddr> for Connector<S, C>
/// A connector request.
/// Contains the information needed to make an outbound connection to the peer.
pub struct OutboundConnectorRequest {
/// The Zcash listener address of the peer.
pub addr: SocketAddr,

/// A connection tracker that reduces the open connection count when dropped.
///
/// Used to limit the number of open connections in Zebra.
pub connection_tracker: ConnectionTracker,
}

impl<S, C> Service<OutboundConnectorRequest> for Connector<S, C>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send,
Expand All @@ -52,14 +66,26 @@ where
Poll::Ready(Ok(()))
}

fn call(&mut self, addr: SocketAddr) -> Self::Future {
fn call(&mut self, req: OutboundConnectorRequest) -> Self::Future {
let OutboundConnectorRequest {
addr,
connection_tracker,
}: OutboundConnectorRequest = req;

let mut hs = self.handshaker.clone();
let connected_addr = ConnectedAddr::new_outbound_direct(addr);
let connector_span = info_span!("connector", peer = ?connected_addr);

async move {
let stream = TcpStream::connect(addr).await?;
hs.ready_and().await?;
let client = hs.call((stream, connected_addr)).await?;
let client = hs
.call(HandshakeRequest {
tcp_stream: stream,
connected_addr,
connection_tracker,
})
.await?;
Ok(Change::Insert(addr, client))
}
.instrument(connector_span)
Expand Down
28 changes: 23 additions & 5 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use zebra_chain::{
use crate::{
constants,
meta_addr::MetaAddrChange,
peer::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerError},
peer_set::ConnectionTracker,
protocol::{
external::{types::*, Codec, InventoryHash, Message},
internal::{Request, Response},
Expand All @@ -37,8 +39,6 @@ use crate::{
BoxError, Config,
};

use super::{Client, ClientRequest, Connection, ErrorSlot, HandshakeError, PeerError};

/// A [`Service`] that handshakes with a remote peer and constructs a
/// client/server pair.
///
Expand Down Expand Up @@ -658,7 +658,20 @@ pub async fn negotiate_version(
Ok((remote_version, remote_services, remote_canonical_addr))
}

pub type HandshakeRequest = (TcpStream, ConnectedAddr);
/// A handshake request.
/// Contains the information needed to handshake with the peer.
pub struct HandshakeRequest {
/// The TCP connection to the peer.
pub tcp_stream: TcpStream,

/// The address of the peer, and other related information.
pub connected_addr: ConnectedAddr,

/// A connection tracker that reduces the open connection count when dropped.
///
/// Used to limit the number of open connections in Zebra.
pub connection_tracker: ConnectionTracker,
}

impl<S, C> Service<HandshakeRequest> for Handshake<S, C>
where
Expand All @@ -676,7 +689,11 @@ where
}

fn call(&mut self, req: HandshakeRequest) -> Self::Future {
let (tcp_stream, connected_addr) = req;
let HandshakeRequest {
tcp_stream,
connected_addr,
connection_tracker,
} = req;

let negotiator_span = debug_span!("negotiator", peer = ?connected_addr);
// set the peer connection span's parent to the global span, as it
Expand Down Expand Up @@ -892,11 +909,12 @@ where
use super::connection;
let server = Connection {
state: connection::State::AwaitingRequest,
request_timer: None,
svc: inbound_service,
client_rx: server_rx.into(),
error_slot: slot,
peer_tx,
request_timer: None,
connection_tracker,
};

tokio::spawn(
Expand Down
3 changes: 3 additions & 0 deletions zebra-network/src/peer_set.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
pub(crate) mod candidate_set;
mod initialize;
mod inventory_registry;
mod limit;
mod set;
mod unready_service;

pub(crate) use candidate_set::CandidateSet;
pub(crate) use limit::{ActiveConnectionCounter, ConnectionTracker};

use inventory_registry::InventoryRegistry;
use set::PeerSet;

Expand Down
Loading