Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace ping/pong feature with juliet timeouts #4331

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ casper-execution-engine = { version = "5.0.0", path = "../execution_engine" }
casper-hashing = { version = "2.0.0", path = "../hashing" }
casper-json-rpc = { version = "1.1.0", path = "../json_rpc" }
casper-types = { version = "3.0.0", path = "../types", features = ["datasize", "json-schema", "std"] }
datasize = { version = "0.2.11", features = ["detailed", "fake_clock-types", "futures-types", "smallvec-types"] }
datasize = { version = "0.2.15", features = ["detailed", "fake_clock-types", "futures-types", "smallvec-types"] }
derive_more = "0.99.7"
either = { version = "1", features = ["serde"] }
enum-iterator = "0.6.0"
Expand Down
80 changes: 20 additions & 60 deletions node/src/components/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ mod error;
mod event;
mod gossiped_address;
mod handshake;
mod health;
mod identity;
mod insights;
mod message;
Expand Down Expand Up @@ -85,7 +84,6 @@ use self::{
chain_info::ChainInfo,
error::{ConnectionError, MessageReceiverError},
event::{IncomingConnection, OutgoingConnection},
health::{HealthConfig, TaggedTimestamp},
message::NodeKeyPair,
metrics::Metrics,
outgoing::{DialOutcome, DialRequest, OutgoingConfig, OutgoingManager},
Expand Down Expand Up @@ -140,20 +138,6 @@ const BASE_RECONNECTION_TIMEOUT: Duration = Duration::from_secs(1);
/// Interval during which to perform outgoing manager housekeeping.
const OUTGOING_MANAGER_SWEEP_INTERVAL: Duration = Duration::from_secs(1);

/// How often to send a ping down a healthy connection.
const PING_INTERVAL: Duration = Duration::from_secs(30);

/// Maximum time for a ping until it connections are severed.
///
/// If you are running a network under very extreme conditions, it may make sense to alter these
/// values, but usually these values should require no changing.
///
/// `PING_TIMEOUT` should be less than `PING_INTERVAL` at all times.
const PING_TIMEOUT: Duration = Duration::from_secs(6);

/// How many pings to send before giving up and dropping the connection.
const PING_RETRIES: u16 = 5;

#[derive(Clone, DataSize, Debug)]
pub(crate) struct OutgoingHandle {
#[data_size(skip)] // Unfortunately, there is no way to inspect an `UnboundedSender`.
Expand Down Expand Up @@ -244,12 +228,6 @@ where
base_timeout: BASE_RECONNECTION_TIMEOUT,
unblock_after: cfg.blocklist_retain_duration.into(),
sweep_timeout: cfg.max_addr_pending_time.into(),
health: HealthConfig {
ping_interval: PING_INTERVAL,
ping_timeout: PING_TIMEOUT,
ping_retries: PING_RETRIES,
pong_limit: (1 + PING_RETRIES as u32) * 2,
},
},
net_metrics.create_outgoing_metrics(),
);
Expand Down Expand Up @@ -490,11 +468,24 @@ where
};
trace!(%msg, encoded_size=payload.len(), %channel, "enqueing message for sending");

// Build the request.
let request = connection
.rpc_client
.create_request(channel.into_channel_id())
.with_payload(payload);
/// Build the request.
///
/// Internal helper function to ensure requests are always built the same way.
// Note: Ideally, this would be a closure, but lifetime inference does not
// work out here, and we cannot annotate lifetimes on closures.
#[inline(always)]
fn mk_request(
rpc_client: &JulietRpcClient<{ Channel::COUNT }>,
channel: Channel,
payload: Bytes,
) -> juliet::rpc::JulietRpcRequestBuilder<'_, { Channel::COUNT }> {
rpc_client
.create_request(channel.into_channel_id())
.with_payload(payload)
.with_timeout(Duration::from_secs(30))
}

let request = mk_request(&connection.rpc_client, channel, payload);

// Attempt to enqueue it directly, regardless of what `message_queued_responder` is.
match request.try_queue_for_sending() {
Expand All @@ -521,9 +512,7 @@ where
// since the networking component usually controls its own futures, we are
// allowed to spawn these as well.
tokio::spawn(async move {
let guard = client
.create_request(channel.into_channel_id())
.with_payload(payload)
let guard = mk_request(&client, channel, payload)
.queue_for_sending()
.await;
responder.respond(()).await;
Expand Down Expand Up @@ -879,7 +868,6 @@ where
addr: peer_addr,
handle,
node_id: peer_id,
when: now,
});

let mut effects = self.process_dial_requests(request);
Expand Down Expand Up @@ -998,14 +986,6 @@ where
debug!("dropping connection, as requested");
})
}
DialRequest::SendPing {
peer_id,
nonce,
span,
} => span.in_scope(|| {
trace!("enqueuing ping to be sent");
self.send_message(peer_id, Arc::new(Message::Ping { nonce }), None);
}),
}
}

Expand Down Expand Up @@ -1033,26 +1013,6 @@ where
warn!("received unexpected handshake");
Effects::new()
}
Message::Ping { nonce } => {
// Send a pong. Incoming pings and pongs are rate limited.

self.send_message(peer_id, Arc::new(Message::Pong { nonce }), None);
Effects::new()
}
Message::Pong { nonce } => {
// Record the time the pong arrived and forward it to outgoing.
let pong = TaggedTimestamp::from_parts(Instant::now(), nonce);
if self.outgoing_manager.record_pong(peer_id, pong) {
effect_builder
.announce_block_peer_with_justification(
peer_id,
BlocklistJustification::PongLimitExceeded,
)
.ignore()
} else {
Effects::new()
}
}
Message::Payload(payload) => effect_builder
.announce_incoming(peer_id, payload, ticket)
.ignore(),
Expand Down Expand Up @@ -1332,7 +1292,7 @@ where
}
Event::SweepOutgoing => {
let now = Instant::now();
let requests = self.outgoing_manager.perform_housekeeping(rng, now);
let requests = self.outgoing_manager.perform_housekeeping(now);

let mut effects = self.process_dial_requests(requests);

Expand Down
5 changes: 0 additions & 5 deletions node/src/components/network/blocklist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ pub(crate) enum BlocklistJustification {
/// The era for which the invalid value was destined.
era: EraId,
},
/// Too many unasked or expired pongs were sent by the peer.
PongLimitExceeded,
/// Peer misbehaved during consensus and is blocked for it.
BadConsensusBehavior,
/// Peer is on the wrong network.
Expand Down Expand Up @@ -76,9 +74,6 @@ impl Display for BlocklistJustification {
BlocklistJustification::SentInvalidConsensusValue { era } => {
write!(f, "sent an invalid consensus value in {}", era)
}
BlocklistJustification::PongLimitExceeded => {
f.write_str("wrote too many expired or invalid pongs")
}
BlocklistJustification::BadConsensusBehavior => {
f.write_str("sent invalid data in consensus")
}
Expand Down
Loading
Loading