Skip to content

Commit

Permalink
Experimental - Reduce # of connections effectively used to gossip tra…
Browse files Browse the repository at this point in the history
…nsactions out (#1558)

* maxpeers for mempool

* mempool: fix max_peers bcast routine active flag

* Use semaphore to limit concurrency

* Rename MaxPeers to MaxOutboundPeers

* Add max_outbound_peers to config toml template

* Rename in error message

* Renams the parameter to highlight its experimental nature. Extend the AddPeer method to return an error. Moves the semaphone to outside the broadcast routine

* reverting the addition of error to AddPeer. It fails if the context is done and handling this case will be done some other time, when an actual context is passed into acquire.

* reverting the addition of error to AddPeer. It fails if the context is done and handling this case will be done some other time, when an actual context is passed into acquire.

* Fixing lint issue

* renaming semaphore to something more meaningful

* make default value 0, which is the same as the current behavior. 10 is the recommended value.

* adding new flag to manifest.go

* Adding changelog

* Improve the description of the parameter in the generated config file.

* Add metric to track the current number of active connections.

* Change metric to gauge type and rename it.

* e2e: Allow disabling the PEX reactor on all nodes in the testnet

* Apply suggestions from code review

Co-authored-by: Sergio Mena <sergio@informal.systems>

* Update config/config.go comment

* fix lint error

* Improve config description

* Rename metric (remove experimental prefix)

* Add unit test

* Improve unit test

* Update mempool/reactor.go comment

---------

Co-authored-by: Ethan Buchman <ethan@coinculture.info>
Co-authored-by: Daniel Cason <daniel.cason@informal.systems>
Co-authored-by: lasarojc <lasaro@informal.systems>
Co-authored-by: hvanz <hernan.vanzetto@gmail.com>
Co-authored-by: Andy Nogueira <me@andynogueira.dev>
Co-authored-by: Sergio Mena <sergio@informal.systems>
  • Loading branch information
7 people committed Nov 10, 2023
1 parent bc4e446 commit c3ea1e8
Show file tree
Hide file tree
Showing 10 changed files with 407 additions and 84 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[mempool]` Only gossip transactions to a subset of the connected peers,
of size `experimental_max_used_outbound_peers`
([\#1558](https://github.com/cometbft/cometbft/pull/1558))
21 changes: 17 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,15 @@ type MempoolConfig struct {
// Including space needed by encoding (one varint per transaction).
// XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
MaxBatchBytes int `mapstructure:"max_batch_bytes"`
// Experimental parameter to limit broadcast of txs to up to this many peers.
// If we are connected to more than this number of peers, only send txs to the first
// ExperimentalMaxUsedOutboundPeers of them. If one of those peers disconnects, activate another
// peer.
// If set to 0, this feature is disabled, that is, the number of active connections is not
// bounded.
// If enabled, a value of 10 is recommended based on experimental performance results using the
// default P2P configuration.
ExperimentalMaxUsedOutboundPeers int `mapstructure:"experimental_max_used_outbound_peers"`
}

// DefaultMempoolConfig returns a default configuration for the CometBFT mempool
Expand All @@ -901,10 +910,11 @@ func DefaultMempoolConfig() *MempoolConfig {
WalPath: "",
// Each signature verification takes .5ms, Size reduced until we implement
// ABCI Recheck
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
ExperimentalMaxUsedOutboundPeers: 0,
}
}

Expand Down Expand Up @@ -940,6 +950,9 @@ func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.MaxTxBytes < 0 {
return errors.New("max_tx_bytes can't be negative")
}
if cfg.ExperimentalMaxUsedOutboundPeers < 0 {
return cmterrors.ErrNegativeField{Field: "experimental_max_used_outbound_peers"}
}
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,14 @@ max_tx_bytes = {{ .Mempool.MaxTxBytes }}
# XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
max_batch_bytes = {{ .Mempool.MaxBatchBytes }}
# Experimental parameter to limit broadcast of txs to up to this many peers
# If we are connected to more than this number of peers, only send txs to
# the first ExperimentalMaxOutboundPeers of them. If one of those peers goes
# offline, activate another peer.
# Value 0 disables the feature by not limiting the number of active connections.
# If you enable this feature, a value of 10 is recommended based on experimental performance results.
experimental_max_used_outbound_peers = {{ .Mempool.ExperimentalMaxUsedOutboundPeers }}
#######################################################
### State Sync Configuration Options ###
#######################################################
Expand Down
19 changes: 13 additions & 6 deletions mempool/metrics.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions mempool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,8 @@ type Metrics struct {

// Number of times transactions are rechecked in the mempool.
RecheckTimes metrics.Counter

// Number of connections being actively used for gossiping transactions
// (experimental feature).
ActiveOutboundConnections metrics.Gauge
}
24 changes: 23 additions & 1 deletion mempool/reactor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mempool

import (
"context"
"errors"
"fmt"
"time"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/cometbft/cometbft/p2p"
protomem "github.com/cometbft/cometbft/proto/tendermint/mempool"
"github.com/cometbft/cometbft/types"
"golang.org/x/sync/semaphore"
)

// Reactor handles mempool tx broadcasting amongst peers.
Expand All @@ -21,6 +23,8 @@ type Reactor struct {
config *cfg.MempoolConfig
mempool *CListMempool
ids *mempoolIDs

activeConnectionsSemaphore *semaphore.Weighted
}

// NewReactor returns a new Reactor with the given config and mempool.
Expand All @@ -31,6 +35,8 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor {
ids: newMempoolIDs(),
}
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
memR.activeConnectionsSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxUsedOutboundPeers))

return memR
}

Expand Down Expand Up @@ -78,7 +84,22 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (memR *Reactor) AddPeer(peer p2p.Peer) {
if memR.config.Broadcast {
go memR.broadcastTxRoutine(peer)
go func() {
if memR.config.ExperimentalMaxUsedOutboundPeers > 0 {
// Around (MaxOutboundPeers-ExperimentalMaxUsedOutboundPeers) goroutines will be
// blocked here waiting for more peers disconnect and free some slots for running.
if err := memR.activeConnectionsSemaphore.Acquire(context.TODO(), 1); err != nil {
memR.Logger.Error("Failed to acquire semaphore: %v", err)
return
}
memR.mempool.metrics.ActiveOutboundConnections.Add(1)
defer func() {
memR.activeConnectionsSemaphore.Release(1)
memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
}()
}
memR.broadcastTxRoutine(peer)
}()
}
}

Expand Down Expand Up @@ -138,6 +159,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
if !memR.IsRunning() || !peer.IsRunning() {
return
}

// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWait() returned nil. Go ahead and
// start from the beginning.
Expand Down
Loading

0 comments on commit c3ea1e8

Please sign in to comment.