Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

### Fixed

- [#6972](https://github.com/ChainSafe/forest/pull/6972) `ChainExchange` hardening to limit the node memory usage.

## Forest v0.33.1 "Paradyzja"

Non-mandatory release for all node operators. It includes support for the NV28 _FireHorse_ network upgrade for devnets (not calibnet or mainnet yet), a number of significant performance improvements and bug fixes.
Expand Down
117 changes: 60 additions & 57 deletions docs/docs/users/reference/env_variables.md

Large diffs are not rendered by default.

148 changes: 147 additions & 1 deletion src/libp2p/chain_exchange/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
// Copyright 2019-2026 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::{
num::NonZeroUsize,
sync::{Arc, LazyLock},
};

use ahash::HashMap;
use libp2p::{
PeerId,
Expand All @@ -9,19 +14,46 @@ use libp2p::{
},
swarm::{NetworkBehaviour, THandlerOutEvent, derive_prelude::*},
};
use nonzero_ext::nonzero;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::debug;

use super::*;
use crate::libp2p::{rpc::RequestResponseError, service::metrics};
use crate::{
libp2p::{rpc::RequestResponseError, service::metrics},
utils::misc::env::env_or_default_logged,
};

type InnerBehaviour = request_response::Behaviour<ChainExchangeCodec>;

/// Maximum number of concurrent inbound chain exchange requests Forest will
/// service. Excess requests are rejected with [`ChainExchangeResponseStatus::GoAway`].
static MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS: LazyLock<NonZeroUsize> =
LazyLock::new(|| {
env_or_default_logged(
"FOREST_MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS",
nonzero!(32_usize),
)
});

/// Per-peer cap on concurrent inbound chain exchange requests. Excess requests
/// from a single peer are rejected with [`ChainExchangeResponseStatus::GoAway`].
static MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS_PER_PEER: LazyLock<NonZeroUsize> =
LazyLock::new(|| {
env_or_default_logged(
"FOREST_MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS_PER_PEER",
nonzero!(4_usize),
)
});

pub struct ChainExchangeBehaviour {
inner: InnerBehaviour,
response_channels: HashMap<
OutboundRequestId,
flume::Sender<Result<ChainExchangeResponse, RequestResponseError>>,
>,
request_limiter: Arc<Semaphore>,
per_peer_limiters: HashMap<PeerId, Arc<Semaphore>>,
}

impl ChainExchangeBehaviour {
Expand All @@ -32,6 +64,34 @@ impl ChainExchangeBehaviour {
cfg,
),
response_channels: Default::default(),
request_limiter: Arc::new(Semaphore::new(
MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS.get(),
)),
per_peer_limiters: Default::default(),
}
}

pub fn try_acquire_request_permit(&self) -> Option<OwnedSemaphorePermit> {
self.request_limiter.clone().try_acquire_owned().ok()
}

/// Lazily creates a per-peer semaphore on first request from `peer`.
pub fn try_acquire_peer_permit(&mut self, peer: PeerId) -> Option<OwnedSemaphorePermit> {
self.per_peer_limiters
.entry(peer)
.or_insert_with(|| {
Arc::new(Semaphore::new(
MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS_PER_PEER.get(),
))
})
.clone()
.try_acquire_owned()
.ok()
}

fn on_peer_connection_closed(&mut self, peer: PeerId, remaining_established: usize) {
if remaining_established == 0 {
self.per_peer_limiters.remove(&peer);
}
}

Expand Down Expand Up @@ -164,6 +224,9 @@ impl NetworkBehaviour for ChainExchangeBehaviour {
}

fn on_swarm_event(&mut self, event: FromSwarm) {
if let FromSwarm::ConnectionClosed(c) = &event {
self.on_peer_connection_closed(c.peer_id, c.remaining_established);
}
self.inner.on_swarm_event(event)
}

Expand All @@ -174,3 +237,86 @@ impl NetworkBehaviour for ChainExchangeBehaviour {
self.inner.poll(cx)
}
}

#[cfg(test)]
mod tests {
use super::*;

fn new_behaviour() -> ChainExchangeBehaviour {
ChainExchangeBehaviour::new(request_response::Config::default())
}

#[test]
fn per_peer_limiter_saturates_independently() {
let mut behaviour = new_behaviour();
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let cap = MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS_PER_PEER.get();

let mut permits_a = Vec::new();
for _ in 0..cap {
permits_a.push(
behaviour
.try_acquire_peer_permit(peer_a)
.expect("peer_a should have permits available"),
);
}
assert!(
behaviour.try_acquire_peer_permit(peer_a).is_none(),
"peer_a should be saturated at its per-peer cap",
);
assert!(
behaviour.try_acquire_peer_permit(peer_b).is_some(),
"peer_b should not be affected by peer_a's saturation",
);

permits_a.clear();
assert!(
behaviour.try_acquire_peer_permit(peer_a).is_some(),
"peer_a should be acquirable after permits are dropped",
);
}

#[test]
fn global_limiter_saturates() {
let behaviour = new_behaviour();
let cap = MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS.get();

let permits: Vec<_> = (0..cap)
.map(|_| {
behaviour
.try_acquire_request_permit()
.expect("global cap not yet reached")
})
.collect();
assert!(
behaviour.try_acquire_request_permit().is_none(),
"global limiter should be saturated",
);
drop(permits);
assert!(
behaviour.try_acquire_request_permit().is_some(),
"global limiter should release permits when dropped",
);
}

#[test]
fn per_peer_entry_removed_on_full_disconnect() {
let mut behaviour = new_behaviour();
let peer_a = PeerId::random();
let _permit = behaviour.try_acquire_peer_permit(peer_a);
assert!(behaviour.per_peer_limiters.contains_key(&peer_a));

behaviour.on_peer_connection_closed(peer_a, 1);
assert!(
behaviour.per_peer_limiters.contains_key(&peer_a),
"entry should be retained while other connections remain",
);

behaviour.on_peer_connection_closed(peer_a, 0);
assert!(
!behaviour.per_peer_limiters.contains_key(&peer_a),
"entry should be removed when last connection closes",
);
}
}
19 changes: 19 additions & 0 deletions src/libp2p/chain_exchange/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::convert::TryFrom;
use crate::blocks::{BLOCK_MESSAGE_LIMIT, Block, CachingBlockHeader, FullTipset, Tipset};
use crate::message::SignedMessage;
use crate::shim::message::Message;
use crate::shim::policy::policy_constants::CHAIN_FINALITY;
use anyhow::Context as _;
use cid::Cid;
use fvm_ipld_encoding::tuple::*;
Expand Down Expand Up @@ -45,6 +46,14 @@ impl ChainExchangeRequest {
pub fn is_options_valid(&self) -> bool {
self.include_blocks() || self.include_messages()
}

/// Checks if the request length is within `(0, CHAIN_FINALITY]`, matching
/// Lotus's [`MaxRequestLength`].
///
/// [`MaxRequestLength`]: https://github.com/filecoin-project/lotus/blob/v1.35.1/chain/exchange/protocol.go#L30
pub fn is_request_len_valid(&self) -> bool {
self.request_len > 0 && self.request_len <= CHAIN_FINALITY as u64
}
}

/// Status codes of a `chain_exchange` response.
Expand Down Expand Up @@ -120,6 +129,16 @@ pub struct ChainExchangeResponse {
}

impl ChainExchangeResponse {
/// Build a [`ChainExchangeResponseStatus::GoAway`] response asking the
/// requester to back off (e.g. when concurrent-request caps are reached).
pub fn go_away(message: impl Into<String>) -> Self {
Self {
chain: Default::default(),
status: ChainExchangeResponseStatus::GoAway,
message: message.into(),
}
}

/// Converts `chain_exchange` response into result.
/// Returns an error if the response status is not `Ok`.
/// Tipset bundle is converted into generic return type with `TryFrom` trait
Expand Down
Loading
Loading