diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bff9b06d818..424d67bf23c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,8 @@ ### Fixed +- [#6972](https://github.com/ChainSafe/forest/pull/6972) `ChainExchange` hardening to limit the node memory usage. + ## Forest v0.33.1 "Paradyzja" Non-mandatory release for all node operators. It includes support for the NV28 _FireHorse_ network upgrade for devnets (not calibnet or mainnet yet), a number of significant performance improvements and bug fixes. diff --git a/docs/docs/users/reference/env_variables.md b/docs/docs/users/reference/env_variables.md index c95b3d86c3b4..ebb579101c5a 100644 --- a/docs/docs/users/reference/env_variables.md +++ b/docs/docs/users/reference/env_variables.md @@ -9,63 +9,66 @@ Besides CLI options and the configuration values in the configuration file, there are some environment variables that control the behavior of a `forest` process. -| Environment variable | Value | Default | Example | Description | -| --------------------------------------------------------- | -------------------------------- | ---------------------------------------------- | ------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -| `FOREST_KEYSTORE_PHRASE` | any text | empty | `asfvdda` | The passphrase for the encrypted keystore | -| `FOREST_CAR_LOADER_FILE_IO` | 1 or true | false | true | Load CAR files with `RandomAccessFile` instead of `Mmap` | -| `FOREST_DB_DEV_MODE` | [see here](#forest_db_dev_mode) | current | current | The database to use in development mode | -| `FOREST_ACTOR_BUNDLE_PATH` | file path | empty | `/path/to/file.car.zst` | Path to the local actor bundle, download from remote servers when not set | -| `FIL_PROOFS_PARAMETER_CACHE` | directory path | empty | `/var/tmp/filecoin-proof-parameters` | Path to folder that caches fil proof parameter files | -| `FOREST_PROOFS_ONLY_IPFS_GATEWAY` | 1 or true | false | 1 | Use only IPFS gateway for proofs parameters download | -| `FOREST_FORCE_TRUST_PARAMS` | 1 or true | false | 1 | Trust the parameters downloaded from the Cloudflare/IPFS | -| `IPFS_GATEWAY` | URL | `https://proofs.filecoin.io/ipfs/` | `https://proofs.filecoin.io/ipfs/` | The IPFS gateway to use for downloading proofs parameters | -| `FOREST_RPC_DEFAULT_TIMEOUT` | Duration (in seconds) | 60 | 10 | The default timeout for RPC calls | -| `FOREST_RPC_MAX_CONNECTIONS` | positive integer | 1000 | 42 | Maximum number of allowed connections for the RPC server | -| `FOREST_RPC_COMPRESS_MIN_BODY_SIZE` | integer in `[-1, 65535]` (bytes) | 1024 | 2048 (or `-1` to disable) | Minimum response body size for which HTTP compression (gzip) is applied; smaller responses are sent uncompressed. Values above 65535 are clamped to 65535. Set to a negative value (e.g. `-1`) to disable compression entirely | -| `FOREST_MAX_CONCURRENT_REQUEST_RESPONSE_STREAMS_PER_PEER` | positive integer | 10 | 10 | the maximum concurrent streams per peer for request-response-based p2p protocols | -| `FOREST_BLOCK_DELAY_SECS` | positive integer | Depends on the network | 30 | Duration of each tipset epoch | -| `FOREST_PROPAGATION_DELAY_SECS` | positive integer | Depends on the network | 20 | How long to wait for a block to propagate through the network | -| `FOREST_PLEDGE_RULE_RAMP` | positive integer | Depends on the network | 200 | Pledge rule ramp duration in epochs (FIP 0081) | -| `FOREST_MAX_FILTERS` | integer | 100 | 100 | The maximum number of filters | -| `FOREST_MAX_FILTER_RESULTS` | positive integer | 10,000 | 10000 | The maximum number of filter results | -| `FOREST_MAX_FILTER_HEIGHT_RANGE` | positive integer | 2880 | 2880 | The maximum filter height range allowed, a conservative limit of one day | -| `FOREST_STATE_MIGRATION_THREADS` | integer | Depends on the machine. | 3 | The number of threads for state migration thread-pool. Advanced users only. | -| `FOREST_CONFIG_PATH` | string | /$FOREST_HOME/com.ChainSafe.Forest/config.toml | `/path/to/config.toml` | Forest configuration path. Alternatively supplied via `--config` cli parameter. | -| `FOREST_TEST_RNG_FIXED_SEED` | non-negative integer | empty | 0 | Override RNG with a reproducible one seeded by the value. This should never be used out of test context for security. | -| `RUST_LOG` | string | empty | `debug,forest_libp2p::service=info` | Allows for log level customization. | -| `FOREST_IGNORE_DRAND` | 1 or true | empty | 1 | Ignore Drand validation. | -| `FOREST_LIBP2P_METRICS_ENABLED` | 1 or true | empty | 1 | Include `libp2p` metrics in Forest's Prometheus output. | -| `FOREST_F3_SIDECAR_RPC_ENDPOINT` | string | 127.0.0.1:23456 | `127.0.0.1:23456` | An RPC endpoint of F3 sidecar. | -| `FOREST_F3_SIDECAR_FFI_ENABLED` | 1 or true | hard-coded per chain | 1 | Whether or not to start the F3 sidecar via FFI | -| `FOREST_F3_CONSENSUS_ENABLED` | 1 or true | hard-coded per chain | 1 | Whether or not to apply the F3 consensus to the node | -| `FOREST_F3_FINALITY` | integer | inherited from chain configuration | 900 | Set the chain finality epochs in F3 manifest | -| `FOREST_F3_PERMANENT_PARTICIPATING_MINER_ADDRESSES` | comma delimited strings | empty | `t0100,t0101` | Set the miner addresses that participate in F3 permanently | -| `FOREST_F3_INITIAL_POWER_TABLE` | string | empty | `bafyreicmaj5hhoy5mgqvamfhgexxyergw7hdeshizghodwkjg6qmpoco7i` | Set the F3 initial power table CID | -| `FOREST_F3_ROOT` | string | [FOREST_DATA_ROOT]/f3 | `/var/tmp/f3` | Set the data directory for F3 | -| `FOREST_F3_BOOTSTRAP_EPOCH` | integer | -1 | 100 | Set the bootstrap epoch for F3 | -| `FOREST_DRAND_MAINNET_CONFIG` | string | empty | refer to Drand config format section | Override `DRAND_MAINNET` config | -| `FOREST_DRAND_QUICKNET_CONFIG` | string | empty | refer to Drand config format section | Override `DRAND_QUICKNET` config | -| `FOREST_DRAND_INCENTINET_CONFIG` | string | empty | refer to Drand config format section | Override `DRAND_INCENTINET` config | -| `FOREST_TRACE_FILTER_MAX_RESULT` | positive integer | 500 | 1000 | Sets the maximum results returned per request by `trace_filter` | -| `FOREST_CHAIN_INDEXER_ENABLED` | 1 or true | false | 1 | Whether or not to index the chain to support the Ethereum RPC API | -| `FOREST_MESSAGES_IN_TIPSET_CACHE_SIZE` | positive integer | 100 | 42 | The size of an internal cache of tipsets to messages | -| `FOREST_STATE_MIGRATION_DB_WRITE_BUFFER` | non-negative integer | 10000 | 100000 | The size of db write buffer for state migration (`~10MB` RAM per `10k` buffer) | -| `FOREST_SNAPSHOT_GC_INTERVAL_EPOCHS` | non-negative integer | 20160 | 8000 | The interval in epochs for scheduling snapshot GC | -| `FOREST_SNAPSHOT_GC_CHECK_INTERVAL_SECONDS` | non-negative integer | 300 | 60 | The interval in seconds for checking if snapshot GC should run | -| `FOREST_SNAPSHOT_GC_KEEP_STATE_TREE_EPOCHS` | non-negative integer | 2000 | 20160 | The number of most recent epochs of state trees to keep after GC | -| `FOREST_DISABLE_BAD_BLOCK_CACHE` | 1 or true | empty | 1 | Whether or not to disable bad block cache | -| `FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE` | positive integer | 268435456 | 536870912 | The default zstd frame cache max size in bytes | -| `FOREST_JWT_DISABLE_EXP_VALIDATION` | 1 or true | empty | 1 | Whether or not to disable JWT expiration validation | -| `FOREST_ETH_BLOCK_CACHE_SIZE` | positive integer | 500 | 1 | The size of Eth block cache | -| `FOREST_RPC_BACKFILL_FULL_TIPSET_FROM_NETWORK` | 1 or true | false | 1 | Whether or not to backfill full tipsets from the p2p network | -| `FOREST_STRICT_JSON` | 1 or true | false | 1 | Enable strict JSON validation to detect duplicate keys and reject unknown fields in RPC requests and responses | -| `FOREST_AUTO_DOWNLOAD_SNAPSHOT_PATH` | URL or file path | empty | `/var/tmp/forest_snapshot_calibnet.forest.car.zst` | Override snapshot path for `--auto-download-snapshot` | -| `FOREST_DOWNLOAD_CONNECTIONS` | positive integer | 5 | 10 | Number of parallel HTTP connections for downloading snapshots | -| `FOREST_ETH_V1_DISABLE_F3_FINALITY_RESOLUTION` | 1 or true | empty | 1 | Whether or not to disable F3 finality resolution in Eth `v1` RPC methods | -| `FOREST_GENESIS_NETWORK_VERSION` | non-negative integer | empty | 25 | Override the genesis network version (devnet only) | -| `FOREST_TIPSET_CACHE_DISABLED` | 1 or true | empty | 1 | Disable the tipset cache. Used internally by development and tool subcommands | -| `FOREST_MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS` | positive integer | 3 | 3 | number of max concurrent requests to send over chain exchange protocol | -| `FOREST_FEES_FIP0115HEIGHT` | integer | -1 | 100 | FIP-0115 base fee activation epoch. Set to -1 to disable. **Consensus-breaking, for testing only.** | +| Environment variable | Value | Default | Example | Description | +| ---------------------------------------------------------------- | -------------------------------- | ---------------------------------------------- | ------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| `FOREST_KEYSTORE_PHRASE` | any text | empty | `asfvdda` | The passphrase for the encrypted keystore | +| `FOREST_CAR_LOADER_FILE_IO` | 1 or true | false | true | Load CAR files with `RandomAccessFile` instead of `Mmap` | +| `FOREST_DB_DEV_MODE` | [see here](#forest_db_dev_mode) | current | current | The database to use in development mode | +| `FOREST_ACTOR_BUNDLE_PATH` | file path | empty | `/path/to/file.car.zst` | Path to the local actor bundle, download from remote servers when not set | +| `FIL_PROOFS_PARAMETER_CACHE` | directory path | empty | `/var/tmp/filecoin-proof-parameters` | Path to folder that caches fil proof parameter files | +| `FOREST_PROOFS_ONLY_IPFS_GATEWAY` | 1 or true | false | 1 | Use only IPFS gateway for proofs parameters download | +| `FOREST_FORCE_TRUST_PARAMS` | 1 or true | false | 1 | Trust the parameters downloaded from the Cloudflare/IPFS | +| `IPFS_GATEWAY` | URL | `https://proofs.filecoin.io/ipfs/` | `https://proofs.filecoin.io/ipfs/` | The IPFS gateway to use for downloading proofs parameters | +| `FOREST_RPC_DEFAULT_TIMEOUT` | Duration (in seconds) | 60 | 10 | The default timeout for RPC calls | +| `FOREST_RPC_MAX_CONNECTIONS` | positive integer | 1000 | 42 | Maximum number of allowed connections for the RPC server | +| `FOREST_RPC_COMPRESS_MIN_BODY_SIZE` | integer in `[-1, 65535]` (bytes) | 1024 | 2048 (or `-1` to disable) | Minimum response body size for which HTTP compression (gzip) is applied; smaller responses are sent uncompressed. Values above 65535 are clamped to 65535. Set to a negative value (e.g. `-1`) to disable compression entirely | +| `FOREST_MAX_CONCURRENT_REQUEST_RESPONSE_STREAMS_PER_PEER` | positive integer | 10 | 10 | the maximum concurrent streams per peer for request-response-based p2p protocols | +| `FOREST_BLOCK_DELAY_SECS` | positive integer | Depends on the network | 30 | Duration of each tipset epoch | +| `FOREST_PROPAGATION_DELAY_SECS` | positive integer | Depends on the network | 20 | How long to wait for a block to propagate through the network | +| `FOREST_PLEDGE_RULE_RAMP` | positive integer | Depends on the network | 200 | Pledge rule ramp duration in epochs (FIP 0081) | +| `FOREST_MAX_FILTERS` | integer | 100 | 100 | The maximum number of filters | +| `FOREST_MAX_FILTER_RESULTS` | positive integer | 10,000 | 10000 | The maximum number of filter results | +| `FOREST_MAX_FILTER_HEIGHT_RANGE` | positive integer | 2880 | 2880 | The maximum filter height range allowed, a conservative limit of one day | +| `FOREST_STATE_MIGRATION_THREADS` | integer | Depends on the machine. | 3 | The number of threads for state migration thread-pool. Advanced users only. | +| `FOREST_CONFIG_PATH` | string | /$FOREST_HOME/com.ChainSafe.Forest/config.toml | `/path/to/config.toml` | Forest configuration path. Alternatively supplied via `--config` cli parameter. | +| `FOREST_TEST_RNG_FIXED_SEED` | non-negative integer | empty | 0 | Override RNG with a reproducible one seeded by the value. This should never be used out of test context for security. | +| `RUST_LOG` | string | empty | `debug,forest_libp2p::service=info` | Allows for log level customization. | +| `FOREST_IGNORE_DRAND` | 1 or true | empty | 1 | Ignore Drand validation. | +| `FOREST_LIBP2P_METRICS_ENABLED` | 1 or true | empty | 1 | Include `libp2p` metrics in Forest's Prometheus output. | +| `FOREST_F3_SIDECAR_RPC_ENDPOINT` | string | 127.0.0.1:23456 | `127.0.0.1:23456` | An RPC endpoint of F3 sidecar. | +| `FOREST_F3_SIDECAR_FFI_ENABLED` | 1 or true | hard-coded per chain | 1 | Whether or not to start the F3 sidecar via FFI | +| `FOREST_F3_CONSENSUS_ENABLED` | 1 or true | hard-coded per chain | 1 | Whether or not to apply the F3 consensus to the node | +| `FOREST_F3_FINALITY` | integer | inherited from chain configuration | 900 | Set the chain finality epochs in F3 manifest | +| `FOREST_F3_PERMANENT_PARTICIPATING_MINER_ADDRESSES` | comma delimited strings | empty | `t0100,t0101` | Set the miner addresses that participate in F3 permanently | +| `FOREST_F3_INITIAL_POWER_TABLE` | string | empty | `bafyreicmaj5hhoy5mgqvamfhgexxyergw7hdeshizghodwkjg6qmpoco7i` | Set the F3 initial power table CID | +| `FOREST_F3_ROOT` | string | [FOREST_DATA_ROOT]/f3 | `/var/tmp/f3` | Set the data directory for F3 | +| `FOREST_F3_BOOTSTRAP_EPOCH` | integer | -1 | 100 | Set the bootstrap epoch for F3 | +| `FOREST_DRAND_MAINNET_CONFIG` | string | empty | refer to Drand config format section | Override `DRAND_MAINNET` config | +| `FOREST_DRAND_QUICKNET_CONFIG` | string | empty | refer to Drand config format section | Override `DRAND_QUICKNET` config | +| `FOREST_DRAND_INCENTINET_CONFIG` | string | empty | refer to Drand config format section | Override `DRAND_INCENTINET` config | +| `FOREST_TRACE_FILTER_MAX_RESULT` | positive integer | 500 | 1000 | Sets the maximum results returned per request by `trace_filter` | +| `FOREST_CHAIN_INDEXER_ENABLED` | 1 or true | false | 1 | Whether or not to index the chain to support the Ethereum RPC API | +| `FOREST_MESSAGES_IN_TIPSET_CACHE_SIZE` | positive integer | 100 | 42 | The size of an internal cache of tipsets to messages | +| `FOREST_STATE_MIGRATION_DB_WRITE_BUFFER` | non-negative integer | 10000 | 100000 | The size of db write buffer for state migration (`~10MB` RAM per `10k` buffer) | +| `FOREST_SNAPSHOT_GC_INTERVAL_EPOCHS` | non-negative integer | 20160 | 8000 | The interval in epochs for scheduling snapshot GC | +| `FOREST_SNAPSHOT_GC_CHECK_INTERVAL_SECONDS` | non-negative integer | 300 | 60 | The interval in seconds for checking if snapshot GC should run | +| `FOREST_SNAPSHOT_GC_KEEP_STATE_TREE_EPOCHS` | non-negative integer | 2000 | 20160 | The number of most recent epochs of state trees to keep after GC | +| `FOREST_DISABLE_BAD_BLOCK_CACHE` | 1 or true | empty | 1 | Whether or not to disable bad block cache | +| `FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE` | positive integer | 268435456 | 536870912 | The default zstd frame cache max size in bytes | +| `FOREST_JWT_DISABLE_EXP_VALIDATION` | 1 or true | empty | 1 | Whether or not to disable JWT expiration validation | +| `FOREST_ETH_BLOCK_CACHE_SIZE` | positive integer | 500 | 1 | The size of Eth block cache | +| `FOREST_RPC_BACKFILL_FULL_TIPSET_FROM_NETWORK` | 1 or true | false | 1 | Whether or not to backfill full tipsets from the p2p network | +| `FOREST_STRICT_JSON` | 1 or true | false | 1 | Enable strict JSON validation to detect duplicate keys and reject unknown fields in RPC requests and responses | +| `FOREST_AUTO_DOWNLOAD_SNAPSHOT_PATH` | URL or file path | empty | `/var/tmp/forest_snapshot_calibnet.forest.car.zst` | Override snapshot path for `--auto-download-snapshot` | +| `FOREST_DOWNLOAD_CONNECTIONS` | positive integer | 5 | 10 | Number of parallel HTTP connections for downloading snapshots | +| `FOREST_ETH_V1_DISABLE_F3_FINALITY_RESOLUTION` | 1 or true | empty | 1 | Whether or not to disable F3 finality resolution in Eth `v1` RPC methods | +| `FOREST_GENESIS_NETWORK_VERSION` | non-negative integer | empty | 25 | Override the genesis network version (devnet only) | +| `FOREST_TIPSET_CACHE_DISABLED` | 1 or true | empty | 1 | Disable the tipset cache. Used internally by development and tool subcommands | +| `FOREST_MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS` | positive integer | 3 | 3 | Maximum number of **outbound** chain exchange requests sent by chain sync to the network | +| `FOREST_MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS` | positive integer | 32 | 32 | Maximum number of inbound chain exchange requests Forest will service concurrently. Excess requests are rejected with a `GoAway` response | +| `FOREST_MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS_PER_PEER` | positive integer | 4 | 4 | Per-peer cap on concurrent inbound chain exchange requests. Excess requests from a single peer are rejected with a `GoAway` response | +| `FOREST_MAX_OUTBOUND_CHAIN_EXCHANGE_RESPONSE_BYTES` | positive integer (bytes) | 10485760 (10 MiB) | 10485760 | Cap on the encoded byte size of a chain exchange response Forest serves to peers. Building stops as soon as the running encoded size would exceed this cap and the response is returned with `PartialResponse` status | +| `FOREST_FEES_FIP0115HEIGHT` | integer | -1 | 100 | FIP-0115 base fee activation epoch. Set to -1 to disable. **Consensus-breaking, for testing only.** | ### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT` diff --git a/src/libp2p/chain_exchange/behaviour.rs b/src/libp2p/chain_exchange/behaviour.rs index ac84d260411c..0ba39a76d56f 100644 --- a/src/libp2p/chain_exchange/behaviour.rs +++ b/src/libp2p/chain_exchange/behaviour.rs @@ -1,6 +1,11 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use std::{ + num::NonZeroUsize, + sync::{Arc, LazyLock}, +}; + use ahash::HashMap; use libp2p::{ PeerId, @@ -9,19 +14,46 @@ use libp2p::{ }, swarm::{NetworkBehaviour, THandlerOutEvent, derive_prelude::*}, }; +use nonzero_ext::nonzero; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tracing::debug; use super::*; -use crate::libp2p::{rpc::RequestResponseError, service::metrics}; +use crate::{ + libp2p::{rpc::RequestResponseError, service::metrics}, + utils::misc::env::env_or_default_logged, +}; type InnerBehaviour = request_response::Behaviour; +/// Maximum number of concurrent inbound chain exchange requests Forest will +/// service. Excess requests are rejected with [`ChainExchangeResponseStatus::GoAway`]. +static MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS: LazyLock = + LazyLock::new(|| { + env_or_default_logged( + "FOREST_MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS", + nonzero!(32_usize), + ) + }); + +/// Per-peer cap on concurrent inbound chain exchange requests. Excess requests +/// from a single peer are rejected with [`ChainExchangeResponseStatus::GoAway`]. +static MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS_PER_PEER: LazyLock = + LazyLock::new(|| { + env_or_default_logged( + "FOREST_MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS_PER_PEER", + nonzero!(4_usize), + ) + }); + pub struct ChainExchangeBehaviour { inner: InnerBehaviour, response_channels: HashMap< OutboundRequestId, flume::Sender>, >, + request_limiter: Arc, + per_peer_limiters: HashMap>, } impl ChainExchangeBehaviour { @@ -32,6 +64,34 @@ impl ChainExchangeBehaviour { cfg, ), response_channels: Default::default(), + request_limiter: Arc::new(Semaphore::new( + MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS.get(), + )), + per_peer_limiters: Default::default(), + } + } + + pub fn try_acquire_request_permit(&self) -> Option { + self.request_limiter.clone().try_acquire_owned().ok() + } + + /// Lazily creates a per-peer semaphore on first request from `peer`. + pub fn try_acquire_peer_permit(&mut self, peer: PeerId) -> Option { + self.per_peer_limiters + .entry(peer) + .or_insert_with(|| { + Arc::new(Semaphore::new( + MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS_PER_PEER.get(), + )) + }) + .clone() + .try_acquire_owned() + .ok() + } + + fn on_peer_connection_closed(&mut self, peer: PeerId, remaining_established: usize) { + if remaining_established == 0 { + self.per_peer_limiters.remove(&peer); } } @@ -164,6 +224,9 @@ impl NetworkBehaviour for ChainExchangeBehaviour { } fn on_swarm_event(&mut self, event: FromSwarm) { + if let FromSwarm::ConnectionClosed(c) = &event { + self.on_peer_connection_closed(c.peer_id, c.remaining_established); + } self.inner.on_swarm_event(event) } @@ -174,3 +237,86 @@ impl NetworkBehaviour for ChainExchangeBehaviour { self.inner.poll(cx) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn new_behaviour() -> ChainExchangeBehaviour { + ChainExchangeBehaviour::new(request_response::Config::default()) + } + + #[test] + fn per_peer_limiter_saturates_independently() { + let mut behaviour = new_behaviour(); + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + let cap = MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS_PER_PEER.get(); + + let mut permits_a = Vec::new(); + for _ in 0..cap { + permits_a.push( + behaviour + .try_acquire_peer_permit(peer_a) + .expect("peer_a should have permits available"), + ); + } + assert!( + behaviour.try_acquire_peer_permit(peer_a).is_none(), + "peer_a should be saturated at its per-peer cap", + ); + assert!( + behaviour.try_acquire_peer_permit(peer_b).is_some(), + "peer_b should not be affected by peer_a's saturation", + ); + + permits_a.clear(); + assert!( + behaviour.try_acquire_peer_permit(peer_a).is_some(), + "peer_a should be acquirable after permits are dropped", + ); + } + + #[test] + fn global_limiter_saturates() { + let behaviour = new_behaviour(); + let cap = MAX_CONCURRENT_INBOUND_CHAIN_EXCHANGE_REQUESTS.get(); + + let permits: Vec<_> = (0..cap) + .map(|_| { + behaviour + .try_acquire_request_permit() + .expect("global cap not yet reached") + }) + .collect(); + assert!( + behaviour.try_acquire_request_permit().is_none(), + "global limiter should be saturated", + ); + drop(permits); + assert!( + behaviour.try_acquire_request_permit().is_some(), + "global limiter should release permits when dropped", + ); + } + + #[test] + fn per_peer_entry_removed_on_full_disconnect() { + let mut behaviour = new_behaviour(); + let peer_a = PeerId::random(); + let _permit = behaviour.try_acquire_peer_permit(peer_a); + assert!(behaviour.per_peer_limiters.contains_key(&peer_a)); + + behaviour.on_peer_connection_closed(peer_a, 1); + assert!( + behaviour.per_peer_limiters.contains_key(&peer_a), + "entry should be retained while other connections remain", + ); + + behaviour.on_peer_connection_closed(peer_a, 0); + assert!( + !behaviour.per_peer_limiters.contains_key(&peer_a), + "entry should be removed when last connection closes", + ); + } +} diff --git a/src/libp2p/chain_exchange/message.rs b/src/libp2p/chain_exchange/message.rs index 08c72dbbf4f9..5d9c7e770304 100644 --- a/src/libp2p/chain_exchange/message.rs +++ b/src/libp2p/chain_exchange/message.rs @@ -6,6 +6,7 @@ use std::convert::TryFrom; use crate::blocks::{BLOCK_MESSAGE_LIMIT, Block, CachingBlockHeader, FullTipset, Tipset}; use crate::message::SignedMessage; use crate::shim::message::Message; +use crate::shim::policy::policy_constants::CHAIN_FINALITY; use anyhow::Context as _; use cid::Cid; use fvm_ipld_encoding::tuple::*; @@ -45,6 +46,14 @@ impl ChainExchangeRequest { pub fn is_options_valid(&self) -> bool { self.include_blocks() || self.include_messages() } + + /// Checks if the request length is within `(0, CHAIN_FINALITY]`, matching + /// Lotus's [`MaxRequestLength`]. + /// + /// [`MaxRequestLength`]: https://github.com/filecoin-project/lotus/blob/v1.35.1/chain/exchange/protocol.go#L30 + pub fn is_request_len_valid(&self) -> bool { + self.request_len > 0 && self.request_len <= CHAIN_FINALITY as u64 + } } /// Status codes of a `chain_exchange` response. @@ -120,6 +129,16 @@ pub struct ChainExchangeResponse { } impl ChainExchangeResponse { + /// Build a [`ChainExchangeResponseStatus::GoAway`] response asking the + /// requester to back off (e.g. when concurrent-request caps are reached). + pub fn go_away(message: impl Into) -> Self { + Self { + chain: Default::default(), + status: ChainExchangeResponseStatus::GoAway, + message: message.into(), + } + } + /// Converts `chain_exchange` response into result. /// Returns an error if the response status is not `Ok`. /// Tipset bundle is converted into generic return type with `TryFrom` trait diff --git a/src/libp2p/chain_exchange/provider.rs b/src/libp2p/chain_exchange/provider.rs index cf16b525c15f..906f39edae33 100644 --- a/src/libp2p/chain_exchange/provider.rs +++ b/src/libp2p/chain_exchange/provider.rs @@ -1,17 +1,54 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use crate::blocks::{Tipset, TipsetKey}; -use crate::chain::{ChainStore, Error as ChainError}; +use std::{io, num::NonZeroUsize, sync::LazyLock}; + use ahash::{HashMap, HashMapExt}; use cid::Cid; use fvm_ipld_blockstore::Blockstore; use itertools::Itertools; +use nonzero_ext::nonzero; use super::{ ChainExchangeRequest, ChainExchangeResponse, ChainExchangeResponseStatus, CompactedMessages, TipsetBundle, }; +use crate::{ + blocks::{Tipset, TipsetKey}, + chain::{ChainStore, Error as ChainError}, + utils::misc::env::env_or_default_logged, +}; + +/// Maximum encoded byte size of a chain-exchange response we serve to peers. +/// Building stops as soon as the running encoded size would exceed this cap; +/// the response is returned with status +/// [`ChainExchangeResponseStatus::PartialResponse`]. +static MAX_OUTBOUND_CHAIN_EXCHANGE_RESPONSE_BYTES: LazyLock = LazyLock::new(|| { + env_or_default_logged( + "FOREST_MAX_OUTBOUND_CHAIN_EXCHANGE_RESPONSE_BYTES", + nonzero!(10 * 1024 * 1024_usize), + ) +}); + +/// `io::Write` that discards the bytes and only tracks how many were written. +struct CountingSink(usize); + +impl io::Write for CountingSink { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0 += buf.len(); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +fn encoded_size(value: &T) -> Result { + let mut sink = CountingSink(0); + fvm_ipld_encoding::to_writer(&mut sink, value)?; + Ok(sink.0) +} /// Builds chain exchange response out of chain data. pub fn make_chain_exchange_response( @@ -21,11 +58,26 @@ pub fn make_chain_exchange_response( where DB: Blockstore + Send + Sync + 'static, { - if !request.is_options_valid() { + make_chain_exchange_response_with_cap( + cs, + request, + MAX_OUTBOUND_CHAIN_EXCHANGE_RESPONSE_BYTES.get(), + ) +} + +fn make_chain_exchange_response_with_cap( + cs: &ChainStore, + request: &ChainExchangeRequest, + max_bytes: usize, +) -> ChainExchangeResponse +where + DB: Blockstore + Send + Sync + 'static, +{ + if !request.is_options_valid() || !request.is_request_len_valid() { return ChainExchangeResponse { chain: Default::default(), status: ChainExchangeResponseStatus::BadRequest, - message: format!("Invalid options {}", request.options), + message: format!("Invalid chain exchange request {request:?}"), }; } @@ -44,22 +96,27 @@ where } }; - let chain: Vec<_> = root - .chain(cs.blockstore()) - .take(request.request_len as _) - .map(|tipset| { - let mut tipset_bundle: TipsetBundle = TipsetBundle::default(); - if request.include_messages() { - tipset_bundle.messages = Some(compact_messages(cs.blockstore(), &tipset)?); - } + let mut chain: Vec = Vec::with_capacity(request.request_len as usize); + let mut accumulated: usize = 0; - if request.include_blocks() { - tipset_bundle.blocks = tipset.block_headers().iter().cloned().collect_vec(); - } + for tipset in root.chain(cs.blockstore()).take(request.request_len as _) { + let mut tipset_bundle: TipsetBundle = TipsetBundle::default(); + if request.include_messages() { + tipset_bundle.messages = Some(compact_messages(cs.blockstore(), &tipset)?); + } + if request.include_blocks() { + tipset_bundle.blocks = tipset.block_headers().iter().cloned().collect_vec(); + } - anyhow::Ok(tipset_bundle) - }) - .try_collect()?; + let bundle_bytes = encoded_size(&tipset_bundle)?; + // Always include the first bundle so a peer can make forward + // progress even if a single tipset exceeds the cap. + if !chain.is_empty() && accumulated + bundle_bytes > max_bytes { + break; + } + accumulated += bundle_bytes; + chain.push(tipset_bundle); + } anyhow::Ok(ChainExchangeResponse { status: if request.request_len > chain.len() as u64 { @@ -157,31 +214,31 @@ mod tests { use nunny::Vec as NonEmpty; use std::{io::Cursor, sync::Arc}; - async fn populate_db() -> (NonEmpty, Arc) { + async fn populate_chain_store() -> (NonEmpty, ChainStore) { let db = Arc::new(MemoryDB::default()); - // The cids are the tipset cids of the most recent tipset (39th) + // The cids are the tipset cids of the most recent tipset (39th). let header = load_car(&db, Cursor::new(EXPORT_SR_40)).await.unwrap(); - (header.roots, db) - } - - #[tokio::test] - async fn compact_messages_test() { - let (cids, db) = populate_db().await; - let gen_block = CachingBlockHeader::new(RawBlockHeader { miner_address: Address::new_id(0), ..Default::default() }); + let cs = ChainStore::new( + db.clone(), + db.clone(), + db, + Arc::new(ChainConfig::default()), + gen_block, + ) + .unwrap(); + (header.roots, cs) + } + + #[tokio::test] + async fn compact_messages_test() { + let (cids, cs) = populate_chain_store().await; let response = make_chain_exchange_response( - &ChainStore::new( - db.clone(), - db.clone(), - db, - Arc::new(ChainConfig::default()), - gen_block, - ) - .unwrap(), + &cs, &ChainExchangeRequest { start: cids, request_len: 2, @@ -238,4 +295,49 @@ mod tests { assert_eq!(ts_38_msgs.secp_msg_includes[1].len(), 1); assert_eq!(ts_38_msgs.bls_msg_includes[1].len(), 11); } + + #[tokio::test] + async fn response_byte_cap_truncates_to_partial() { + let (cids, cs) = populate_chain_store().await; + + // A 1-byte cap exercises the always-include-first invariant. + let response = make_chain_exchange_response_with_cap( + &cs, + &ChainExchangeRequest { + start: cids, + request_len: 5, + options: HEADERS | MESSAGES, + }, + 1, + ); + + assert_eq!(response.chain.len(), 1); + assert_eq!( + response.status, + ChainExchangeResponseStatus::PartialResponse + ); + } + + #[tokio::test] + async fn counting_sink_matches_to_vec() { + // Sanity: the sink's running count equals what `to_vec` produces, so the + // budget we apply is the same byte count we'd actually write on the wire. + // A real populated response exercises Vec, byte-array, and nested-struct + // serializations rather than a trivial primitive. + let (cids, cs) = populate_chain_store().await; + let response = make_chain_exchange_response( + &cs, + &ChainExchangeRequest { + start: cids, + request_len: 2, + options: HEADERS | MESSAGES, + }, + ); + + let mut sink = CountingSink(0); + fvm_ipld_encoding::to_writer(&mut sink, &response).unwrap(); + let to_vec_len = fvm_ipld_encoding::to_vec(&response).unwrap().len(); + assert!(sink.0 > 0, "expected a non-empty encoded response"); + assert_eq!(sink.0, to_vec_len); + } } diff --git a/src/libp2p/rpc/mod.rs b/src/libp2p/rpc/mod.rs index 6b2099a96f34..c79bc71bf42b 100644 --- a/src/libp2p/rpc/mod.rs +++ b/src/libp2p/rpc/mod.rs @@ -99,8 +99,13 @@ where where T: AsyncRead + Unpin + Send, { - let mut bytes = vec![]; - io.read_to_end(&mut bytes).await?; + // Cap buffered bytes per response to bound memory exposure. Over-cap + // streams get cut off; the decode then fails and chain-sync retries + // another peer. Matches Lotus's `maxExchangeMessageSize`: + // https://github.com/filecoin-project/lotus/blob/v1.35.1/chain/exchange/client.go#L30 + const MAX_RESPONSE_BYTES: u64 = 120 * 1024 * 1024; + let mut bytes = Vec::with_capacity(64 * 1024); + io.take(MAX_RESPONSE_BYTES).read_to_end(&mut bytes).await?; serde_ipld_dagcbor::de::from_reader(bytes.as_slice()).map_err(io::Error::other) } diff --git a/src/libp2p/service.rs b/src/libp2p/service.rs index 176457dc63ad..2901759f7190 100644 --- a/src/libp2p/service.rs +++ b/src/libp2p/service.rs @@ -811,6 +811,23 @@ async fn handle_chain_exchange_event( channel, request_id, } => { + let Some(per_peer_permit) = chain_exchange.try_acquire_peer_permit(peer) else { + debug!("Rejecting chain_exchange request from {peer}: per-peer cap reached"); + let _ = chain_exchange.send_response( + channel, + ChainExchangeResponse::go_away("per-peer concurrent request cap reached"), + ); + return; + }; + let Some(global_permit) = chain_exchange.try_acquire_request_permit() else { + debug!("Rejecting chain_exchange request from {peer}: global cap reached"); + let _ = chain_exchange.send_response( + channel, + ChainExchangeResponse::go_away("global concurrent request cap reached"), + ); + return; + }; + trace!( "Received chain_exchange request (request_id:{request_id}, peer_id: {peer:?})", ); @@ -822,6 +839,8 @@ async fn handle_chain_exchange_event( let db = db.clone(); tokio::task::spawn(async move { + let _per_peer_permit = per_peer_permit; + let _global_permit = global_permit; if let Err(e) = cx_response_tx.send(( request_id, channel, diff --git a/src/utils/misc/env.rs b/src/utils/misc/env.rs index 220f172ebe65..df194139f704 100644 --- a/src/utils/misc/env.rs +++ b/src/utils/misc/env.rs @@ -1,7 +1,7 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::str::FromStr; +use std::{fmt::Display, str::FromStr}; /// Get the value of an environment variable, or a default value if it is not set or cannot be /// parsed. @@ -12,6 +12,18 @@ pub fn env_or_default(key: &str, default: T) -> T { .unwrap_or(default) } +/// Like [`env_or_default`], but logs at `info` level when the env var was used, +/// so the operator can confirm the override took effect. +pub fn env_or_default_logged(key: &str, default: T) -> T { + match std::env::var(key).ok().and_then(|v| v.parse().ok()) { + Some(value) => { + tracing::info!("`{key}` set to {value}"); + value + } + None => default, + } +} + /// Check if the given environment variable is set to truthy value. /// Returns false if not set. pub fn is_env_truthy(env: &str) -> bool {