Skip to content

Commit

Permalink
Create an ActiveConnectionCounter type to manage connection tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Oct 21, 2021
1 parent 68c5b21 commit 8a9d0eb
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 114 deletions.
2 changes: 2 additions & 0 deletions zebra-network/src/peer_set.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
pub(crate) mod candidate_set;
mod initialize;
mod inventory_registry;
mod limit;
mod set;
mod unready_service;

pub(crate) use candidate_set::CandidateSet;
pub(crate) use limit::ActiveConnectionCounter;
pub(crate) use set::MorePeers;

use inventory_registry::InventoryRegistry;
Expand Down
168 changes: 54 additions & 114 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@
// Portions of this submodule were adapted from tower-balance,
// which is (c) 2019 Tower Contributors (MIT licensed).

use std::{
net::SocketAddr,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use std::{net::SocketAddr, sync::Arc};

use futures::{
channel::mpsc,
Expand All @@ -32,7 +26,7 @@ use crate::{
constants,
meta_addr::MetaAddr,
peer,
peer_set::{CandidateSet, MorePeers, PeerSet},
peer_set::{ActiveConnectionCounter, CandidateSet, MorePeers, PeerSet},
timestamp_collector::TimestampCollector,
AddressBook, BoxError, Config, Request, Response,
};
Expand All @@ -45,17 +39,6 @@ mod tests;
/// This result comes from the [`Handshaker`].
type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;

/// A signal sent by a [`Connection`] when it closes.
///
/// Used to count the number of open connections.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
struct ConnectionClosed;

/// The current number of peers connected to Zebra.
///
/// TODO: replace with non-atomic usize in the inbound/outbound tasks in #2904
static PEER_CONNECTION_COUNT: AtomicUsize = AtomicUsize::new(0);

/// Initialize a peer set, using a network `config`, `inbound_service`,
/// and `latest_chain_tip`.
///
Expand Down Expand Up @@ -126,9 +109,6 @@ where
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100);
// Create an mpsc channel for peerset demand signaling.
let (mut demand_tx, demand_rx) = mpsc::channel::<MorePeers>(100);
// Create an mpsc channel to count open connections.
// This channel will be bounded by the connection limit (#1850, #1851, #2902).
let (connection_tx, mut connection_rx) = mpsc::unbounded::<ConnectionClosed>();

// Create a oneshot to send background task JoinHandles to the peer set
let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
Expand All @@ -154,66 +134,42 @@ where
//
// 1. Incoming peer connections, via a listener.
let listen_guard = tokio::spawn(
accept_inbound_connections(
tcp_listener,
listen_handshaker,
peerset_tx.clone(),
connection_tx.clone(),
)
.instrument(Span::current()),
accept_inbound_connections(tcp_listener, listen_handshaker, peerset_tx.clone())
.instrument(Span::current()),
);

// 2. Initial peers, specified in the config.
let (initial_peer_count_tx, initial_peer_count_rx) = tokio::sync::oneshot::channel();
let initial_peers_fut = {
let config = config.clone();
let outbound_connector = outbound_connector.clone();
let peerset_tx = peerset_tx.clone();
let connection_tx = connection_tx.clone();
async move {
let initial_peers = config.initial_peers().await;
let _ = initial_peer_count_tx.send(initial_peers.len());
add_initial_peers(initial_peers, outbound_connector, peerset_tx, connection_tx).await
add_initial_peers(initial_peers, outbound_connector, peerset_tx).await
}
.boxed()
};

let add_guard = tokio::spawn(initial_peers_fut.instrument(Span::current()));
let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current()));

// 3. Outgoing peers we connect to in response to load.
let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());

// Keep the connection count up to date
//
// TODO: replace with checking the receiver in the inbound/outbound tasks in #2904
let connection_count_fut = {
async move {
// if all the senders are closed, we're probably shutting down, so exit anyway
while let Some(ConnectionClosed) = connection_rx.next().await {
// A connection was closed.
let open_connections = PEER_CONNECTION_COUNT.fetch_sub(1, Ordering::SeqCst);
info!(?open_connections, "a peer connection closed");
}

let open_connections = PEER_CONNECTION_COUNT.load(Ordering::SeqCst);
info!(?open_connections, "stopping connection tracker");

Ok(())
}
.boxed()
};

let connection_count_guard = tokio::spawn(connection_count_fut.instrument(Span::current()));
// Wait for the initial seed peer count
let mut active_outbound_connections = initial_peers_join
.await
.expect("unexpected panic in spawned initial peers task")
.expect("unexpected error connecting to initial peers");
let initial_peer_count = active_outbound_connections.update_count();

// We need to await candidates.update() here, because zcashd only sends one
// `addr` message per connection, and if we only have one initial peer we
// need to ensure that its `addr` message is used by the crawler.

info!("Sending initial request for peers");
let _ = candidates
.update_initial(initial_peer_count_rx.await.expect("value sent before drop"))
.await;
let _ = candidates.update_initial(initial_peer_count).await;

// TODO: reduce demand by initial_peer_count (#2902)
for _ in 0..config.peerset_initial_target_size {
let _ = demand_tx.try_send(MorePeers);
}
Expand All @@ -226,34 +182,24 @@ where
candidates,
outbound_connector,
peerset_tx,
connection_tx.clone(),
active_outbound_connections,
)
.instrument(Span::current()),
);

handle_tx
.send(vec![
add_guard,
listen_guard,
crawl_guard,
connection_count_guard,
])
.unwrap();
handle_tx.send(vec![listen_guard, crawl_guard]).unwrap();

(peer_set, address_book)
}

/// Use the provided `handshaker` to connect to `initial_peers`, then send
/// the results over `peerset_tx`.
///
/// When a connection closes, send a close notification on `connection_tx`.
#[instrument(skip(initial_peers, outbound_connector, peerset_tx, connection_tx))]
#[instrument(skip(initial_peers, outbound_connector, peerset_tx))]
async fn add_initial_peers<S>(
initial_peers: std::collections::HashSet<SocketAddr>,
outbound_connector: S,
mut peerset_tx: mpsc::Sender<PeerChange>,
connection_tx: mpsc::UnboundedSender<ConnectionClosed>,
) -> Result<(), BoxError>
) -> Result<ActiveConnectionCounter, BoxError>
where
S: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError> + Clone,
S::Future: Send + 'static,
Expand All @@ -262,12 +208,16 @@ where
let mut handshake_success_total: usize = 0;
let mut handshake_error_total: usize = 0;

let mut active_outbound_connections = ActiveConnectionCounter::new_counter();

info!(
?initial_peer_count,
?initial_peers,
"connecting to initial peer set"
);

// TODO: send the connection trackers to the outbound connector

// # Security
//
// TODO: rate-limit initial seed peer connections (#2326)
Expand All @@ -288,17 +238,14 @@ where
})
.collect();

// Increment the connection count before we concurrently open all the connections.
//
// TODO: replace with non-atomic usize in the inbound/outbound tasks in #2904
let max_open_connections =
PEER_CONNECTION_COUNT.fetch_add(initial_peer_count, Ordering::SeqCst);
info!(
?max_open_connections,
"opening connections to initial seed peers"
);

while let Some(handshake_result) = handshakes.next().await {
// TODO: we have already opened the connection - track it before opening
let connection_tracker = active_outbound_connections.track_connection();
debug!(
outbound_connections = ?active_outbound_connections.update_count(),
"opened an initial seed peer connection"
);

match handshake_result {
Ok(ref change) => {
handshake_success_total += 1;
Expand All @@ -311,10 +258,7 @@ where
}
Err((addr, ref e)) => {
// The connection was never opened, or it failed the handshake and was dropped.
//
// We ignore disconnected errors, because the receiver task can shut down
// before some connections are dropped.
let _ = connection_tx.unbounded_send(ConnectionClosed);
std::mem::drop(connection_tracker);

handshake_error_total += 1;
// this is verbose, but it's better than just hanging with no output when there are errors
Expand All @@ -333,15 +277,15 @@ where
.await?;
}

let open_connections = PEER_CONNECTION_COUNT.load(Ordering::SeqCst);
let outbound_connections = active_outbound_connections.update_count();
info!(
?handshake_success_total,
?handshake_error_total,
?open_connections,
?outbound_connections,
"finished connecting to initial seed peers"
);

Ok(())
Ok(active_outbound_connections)
}

/// Open a peer connection listener on `config.listen_addr`,
Expand Down Expand Up @@ -401,27 +345,24 @@ async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
///
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
/// the [`peer::Client`] result over `peerset_tx`.
///
/// When a connection closes, send a close notification on `connection_tx`.
#[instrument(skip(listener, handshaker, peerset_tx, connection_tx), fields(listener_addr = ?listener.local_addr()))]
#[instrument(skip(listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))]
async fn accept_inbound_connections<S>(
listener: TcpListener,
mut handshaker: S,
peerset_tx: mpsc::Sender<PeerChange>,
connection_tx: mpsc::UnboundedSender<ConnectionClosed>,
) -> Result<(), BoxError>
where
S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError> + Clone,
S::Future: Send + 'static,
{
let mut active_inbound_connections = ActiveConnectionCounter::new_counter();

loop {
if let Ok((tcp_stream, addr)) = listener.accept().await {
// The peer already opened a connection, so increment the connection count immediately.
//
// TODO: replace with non-atomic usize in the inbound/outbound tasks in #2904
let open_connections = PEER_CONNECTION_COUNT.fetch_add(1, Ordering::SeqCst);
let connection_tracker = active_inbound_connections.track_connection();
info!(
?open_connections,
inbound_connections = ?active_inbound_connections.update_count(),
"handshaking on an open inbound peer connection"
);

Expand All @@ -433,22 +374,21 @@ where
handshaker.ready_and().await?;
// TODO: distinguish between proxied listeners and direct listeners
let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr);

// TODO: send the connection tracker to the handshaker

// Construct a handshake future but do not drive it yet....
let handshake = handshaker.call((tcp_stream, connected_addr));
// ... instead, spawn a new task to handle this connection
{
let mut peerset_tx = peerset_tx.clone();
let connection_tx = connection_tx.clone();
tokio::spawn(
async move {
if let Ok(client) = handshake.await {
let _ = peerset_tx.send(Ok(Change::Insert(addr, client))).await;
} else {
// The connection failed the handshake and was dropped.
//
// We ignore disconnected errors, because the receiver task can shut down
// before some connections are dropped.
let _ = connection_tx.unbounded_send(ConnectionClosed);
std::mem::drop(connection_tracker);
}
}
.instrument(handshaker_span),
Expand Down Expand Up @@ -493,14 +433,15 @@ enum CrawlerAction {
/// permanent internal error. Transient errors and individual peer errors should
/// be handled within the crawler.
///
/// When a connection closes, send a close notification on `connection_tx`.
/// Uses `active_outbound_connections` to track active outbound connections
/// in both the initial peers and crawler.
#[instrument(skip(
demand_tx,
demand_rx,
candidates,
outbound_connector,
peerset_tx,
connection_tx
active_outbound_connections,
))]
async fn crawl_and_dial<C, S>(
crawl_new_peer_interval: std::time::Duration,
Expand All @@ -509,7 +450,7 @@ async fn crawl_and_dial<C, S>(
mut candidates: CandidateSet<S>,
outbound_connector: C,
mut peerset_tx: mpsc::Sender<PeerChange>,
connection_tx: mpsc::UnboundedSender<ConnectionClosed>,
mut active_outbound_connections: ActiveConnectionCounter,
) -> Result<(), BoxError>
where
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError>
Expand Down Expand Up @@ -580,10 +521,13 @@ where
}
DemandHandshake { candidate } => {
// Increment the connection count before we spawn the connection.
//
// TODO: replace with non-atomic usize in the inbound/outbound tasks in #2904
let open_connections = PEER_CONNECTION_COUNT.fetch_add(1, Ordering::SeqCst);
info!(?open_connections, "opening an outbound peer connection");
let _connection_tracker = active_outbound_connections.track_connection();
info!(
outbound_connections = ?active_outbound_connections.update_count(),
"opening an outbound peer connection"
);

// TODO: send the connection tracker to the handshaker

// Spawn each handshake into an independent task, so it can make
// progress independently of the crawls.
Expand Down Expand Up @@ -631,9 +575,7 @@ where
HandshakeFailed { failed_addr } => {
// The connection was never opened, or it failed the handshake and was dropped.
//
// We ignore disconnected errors, because the receiver task can shut down
// before some connections are dropped.
let _ = connection_tx.unbounded_send(ConnectionClosed);
// TODO: drop the connection tracker here?

debug!(?failed_addr.addr, "marking candidate as failed");
candidates.report_failed(&failed_addr);
Expand All @@ -650,8 +592,6 @@ where
///
/// Returns a `HandshakeConnected` action on success, and a
/// `HandshakeFailed` action on error.
///
/// When a connection closes, send a close notification on `connection_tx`.
#[instrument(skip(outbound_connector))]
async fn dial<C>(candidate: MetaAddr, mut outbound_connector: C) -> CrawlerAction
where
Expand Down

0 comments on commit 8a9d0eb

Please sign in to comment.