Skip to content

Commit

Permalink
P2P: try to limit the connection number per IP address (#1623)
Browse files Browse the repository at this point in the history
 ** by default, MaxPeersPerIp is same as MaxPeers
 ** no restriction on TrustedNode
 ** add test case: TestOptionMaxPeersPerIp
  • Loading branch information
brilliant-lx committed May 25, 2023
1 parent 11d16df commit eaea77a
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 13 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ var (
utils.PruneAncientDataFlag,
utils.ListenPortFlag,
utils.MaxPeersFlag,
utils.MaxPeersPerIPFlag,
utils.MaxPendingPeersFlag,
utils.MiningEnabledFlag,
utils.MinerThreadsFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.DNSDiscoveryFlag,
utils.ListenPortFlag,
utils.MaxPeersFlag,
utils.MaxPeersPerIPFlag,
utils.MaxPendingPeersFlag,
utils.NATFlag,
utils.NoDiscoverFlag,
Expand Down
16 changes: 16 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,13 @@ var (
Usage: "Maximum number of network peers (network disabled if set to 0)",
Value: node.DefaultConfig.P2P.MaxPeers,
}

MaxPeersPerIPFlag = cli.IntFlag{
Name: "maxpeersperip",
Usage: "Maximum number of network peers from a single IP address, (default used if set to <= 0, which is same as MaxPeers)",
Value: node.DefaultConfig.P2P.MaxPeersPerIP,
}

MaxPendingPeersFlag = cli.IntFlag{
Name: "maxpendpeers",
Usage: "Maximum number of pending connection attempts (defaults used if set to 0)",
Expand Down Expand Up @@ -1282,6 +1289,15 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) {
cfg.MaxPeers = lightPeers
}
}
// if max peers per ip is not set, use max peers
if cfg.MaxPeersPerIP <= 0 {
cfg.MaxPeersPerIP = cfg.MaxPeers
}
// flag like: `--maxpeersperip 10` could override the setting in config.toml
if ctx.GlobalIsSet(MaxPeersPerIPFlag.Name) {
cfg.MaxPeersPerIP = ctx.GlobalInt(MaxPeersPerIPFlag.Name)
}

if !(lightClient || lightServer) {
lightPeers = 0
}
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func (s *Ethereum) Start() error {
maxPeers -= s.config.LightPeers
}
// Start the networking layer and the light server if requested
s.handler.Start(maxPeers)
s.handler.Start(maxPeers, s.p2pServer.MaxPeersPerIP)
return nil
}

Expand Down
51 changes: 48 additions & 3 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"math"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -140,6 +141,9 @@ type handler struct {
maliciousVoteMonitor *monitor.MaliciousVoteMonitor
chain *core.BlockChain
maxPeers int
maxPeersPerIP int
peersPerIP map[string]int
peerPerIPLock sync.Mutex

downloader *downloader.Downloader
blockFetcher *fetcher.BlockFetcher
Expand Down Expand Up @@ -186,6 +190,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
chain: config.Chain,
peers: config.PeerSet,
merger: config.Merger,
peersPerIP: make(map[string]int),
whitelist: config.Whitelist,
directBroadcast: config.DirectBroadcast,
diffSync: config.DiffSync,
Expand Down Expand Up @@ -387,11 +392,30 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
}
}
// Ignore maxPeers if this is a trusted peer
if !peer.Peer.Info().Network.Trusted {
peerInfo := peer.Peer.Info()
if !peerInfo.Network.Trusted {
if reject || h.peers.len() >= h.maxPeers {
return p2p.DiscTooManyPeers
}
}

remoteAddr := peerInfo.Network.RemoteAddress
indexIP := strings.LastIndex(remoteAddr, ":")
if indexIP == -1 {
// there could be no IP address, such as a pipe
peer.Log().Debug("runEthPeer", "no ip address, remoteAddress", remoteAddr)
} else if !peerInfo.Network.Trusted {
remoteIP := remoteAddr[:indexIP]
h.peerPerIPLock.Lock()
if num, ok := h.peersPerIP[remoteIP]; ok && num >= h.maxPeersPerIP {
h.peerPerIPLock.Unlock()
peer.Log().Info("The IP has too many peers", "ip", remoteIP, "maxPeersPerIP", h.maxPeersPerIP,
"name", peerInfo.Name, "Enode", peerInfo.Enode)
return p2p.DiscTooManyPeers
}
h.peersPerIP[remoteIP] = h.peersPerIP[remoteIP] + 1
h.peerPerIPLock.Unlock()
}
peer.Log().Debug("Ethereum peer connected", "name", peer.Name())

// Register the peer locally
Expand Down Expand Up @@ -626,11 +650,32 @@ func (h *handler) unregisterPeer(id string) {
if err := h.peers.unregisterPeer(id); err != nil {
logger.Error("Ethereum peer removal failed", "err", err)
}

peerInfo := peer.Peer.Info()
remoteAddr := peerInfo.Network.RemoteAddress
indexIP := strings.LastIndex(remoteAddr, ":")
if indexIP == -1 {
// there could be no IP address, such as a pipe
peer.Log().Debug("unregisterPeer", "name", peerInfo.Name, "no ip address, remoteAddress", remoteAddr)
} else if !peerInfo.Network.Trusted {
remoteIP := remoteAddr[:indexIP]
h.peerPerIPLock.Lock()
if h.peersPerIP[remoteIP] <= 0 {
peer.Log().Error("unregisterPeer without record", "name", peerInfo.Name, "remoteAddress", remoteAddr)
} else {
h.peersPerIP[remoteIP] = h.peersPerIP[remoteIP] - 1
logger.Debug("unregisterPeer", "name", peerInfo.Name, "connectNum", h.peersPerIP[remoteIP])
if h.peersPerIP[remoteIP] == 0 {
delete(h.peersPerIP, remoteIP)
}
}
h.peerPerIPLock.Unlock()
}
}

func (h *handler) Start(maxPeers int) {
func (h *handler) Start(maxPeers int, maxPeersPerIP int) {
h.maxPeers = maxPeers

h.maxPeersPerIP = maxPeersPerIP
// broadcast transactions
h.wg.Add(1)
h.txsCh = make(chan core.NewTxsEvent, txChanSize)
Expand Down
2 changes: 1 addition & 1 deletion eth/handler_diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func newTestBackendWithGenerator(blocks int) *testBackend {
BloomCache: 1,
Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()),
})
handler.Start(100)
handler.Start(100, 100)

txconfig := core.DefaultTxPoolConfig
txconfig.Journal = "" // Don't litter the disk with test journals
Expand Down
112 changes: 110 additions & 2 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"math/big"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -148,8 +150,8 @@ func testForkIDSplit(t *testing.T, protocol uint) {
BloomCache: 1,
})
)
ethNoFork.Start(1000)
ethProFork.Start(1000)
ethNoFork.Start(1000, 1000)
ethProFork.Start(1000, 1000)

// Clean up everything after ourselves
defer chainNoFork.Stop()
Expand Down Expand Up @@ -928,3 +930,109 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
}
}
}

func TestOptionMaxPeersPerIP(t *testing.T) {
t.Parallel()

handler := newTestHandler()
defer handler.close()
var (
genesis = handler.chain.Genesis()
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
wg = sync.WaitGroup{}
maxPeersPerIP = handler.handler.maxPeersPerIP
uniPort = 1000
)

tryFunc := func(tryNum int, ip1 string, ip2 string, trust bool, doneCh chan struct{}) {
// Create a source peer to send messages through and a sink handler to receive them
p2pSrc, p2pSink := p2p.MsgPipe()
defer p2pSrc.Close()
defer p2pSink.Close()

peer1 := p2p.NewPeerPipe(enode.ID{0}, "", nil, p2pSrc)
peer1.UpdateTestRemoteAddr(ip1 + strconv.Itoa(uniPort))
peer2 := p2p.NewPeerPipe(enode.ID{byte(uniPort)}, "", nil, p2pSink)
peer2.UpdateTestRemoteAddr(ip2 + strconv.Itoa(uniPort))
if trust {
peer2.UpdateTrustFlagTest()
}
uniPort++

src := eth.NewPeer(eth.ETH66, peer1, p2pSrc, handler.txpool)
sink := eth.NewPeer(eth.ETH66, peer2, p2pSink, handler.txpool)
defer src.Close()
defer sink.Close()

wg.Add(1)
go func(num int) {
err := handler.handler.runEthPeer(sink, func(peer *eth.Peer) error {
wg.Done()
<-doneCh
return nil
})
// err is nil, connection ok and it is closed by the doneCh
if err == nil {
if trust || num <= maxPeersPerIP {
return
}
// if num > maxPeersPerIP and not trust, should report: p2p.DiscTooManyPeers
t.Errorf("current num is %d, maxPeersPerIP is %d, should failed", num, maxPeersPerIP)
return
}
wg.Done()
if trust {
t.Errorf("trust node should not failed, num is %d, maxPeersPerIP is %d, but failed:%s", num, maxPeersPerIP, err)
}
// err should be p2p.DiscTooManyPeers and num > maxPeersPerIP
if err == p2p.DiscTooManyPeers && num > maxPeersPerIP {
return
}

t.Errorf("current num is %d, maxPeersPerIP is %d, but failed:%s", num, maxPeersPerIP, err)
}(tryNum)

if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// make sure runEthPeer execute one by one.
wg.Wait()
}

// case 1: normal case
doneCh1 := make(chan struct{})
for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ {
tryFunc(tryNum, "1.2.3.11:", "1.2.3.22:", false, doneCh1)
}
close(doneCh1)

// case 2: once the previous connection was unregisterred, new connections with same IP can be accepted.
doneCh2 := make(chan struct{})
for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ {
tryFunc(tryNum, "1.2.3.11:", "1.2.3.22:", false, doneCh2)
}
close(doneCh2)

// case 3: ipv6 address, like: [2001:db8::1]:80
doneCh3 := make(chan struct{})
for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ {
tryFunc(tryNum, "[2001:db8::11]:", "[2001:db8::22]:", false, doneCh3)
}
close(doneCh3)

// case 4: same as case 2, but for ipv6
doneCh4 := make(chan struct{})
for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ {
tryFunc(tryNum, "[2001:db8::11]:", "[2001:db8::22]:", false, doneCh4)
}
close(doneCh4)

// case 5: test trust node
doneCh5 := make(chan struct{})
for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ {
tryFunc(tryNum, "[2001:db8::11]:", "[2001:db8::22]:", true, doneCh5)
}
close(doneCh5)

}
2 changes: 1 addition & 1 deletion eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func newTestHandlerWithBlocks(blocks int) *testHandler {
Sync: downloader.SnapSync,
BloomCache: 1,
})
handler.Start(1000)
handler.Start(1000, 3)

return &testHandler{
db: db,
Expand Down
7 changes: 4 additions & 3 deletions node/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ var DefaultConfig = Config{
WSModules: []string{"net", "web3"},
GraphQLVirtualHosts: []string{"localhost"},
P2P: p2p.Config{
ListenAddr: ":30303",
MaxPeers: 50,
NAT: nat.Any(),
ListenAddr: ":30303",
MaxPeers: 50,
MaxPeersPerIP: 0, // by default, it will be same as MaxPeers
NAT: nat.Any(),
},
}

Expand Down
19 changes: 17 additions & 2 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ type Peer struct {
disc chan DiscReason

// events receives message send / receive events if set
events *event.Feed
testPipe *MsgPipeRW // for testing
events *event.Feed
testPipe *MsgPipeRW // for testing
testRemoteAddr string // for testing
}

// NewPeer returns a peer for testing purposes.
Expand Down Expand Up @@ -203,9 +204,23 @@ func (p *Peer) RunningCap(protocol string, versions []uint) bool {

// RemoteAddr returns the remote address of the network connection.
func (p *Peer) RemoteAddr() net.Addr {
if len(p.testRemoteAddr) > 0 {
if addr, err := net.ResolveTCPAddr("tcp", p.testRemoteAddr); err == nil {
return addr
}
log.Warn("RemoteAddr", "invalid testRemoteAddr", p.testRemoteAddr)
}
return p.rw.fd.RemoteAddr()
}

func (p *Peer) UpdateTestRemoteAddr(addr string) { // test purpose only
p.testRemoteAddr = addr
}

func (p *Peer) UpdateTrustFlagTest() { // test purpose only
p.rw.set(trustedConn, true)
}

// LocalAddr returns the local address of the network connection.
func (p *Peer) LocalAddr() net.Addr {
return p.rw.fd.LocalAddr()
Expand Down
4 changes: 4 additions & 0 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ type Config struct {
// connected. It must be greater than zero.
MaxPeers int

// MaxPeersPerIP is the maximum number of peers that can be
// connected from a single IP. It must be greater than zero.
MaxPeersPerIP int `toml:",omitempty"`

// MaxPendingPeers is the maximum number of peers that can be pending in the
// handshake phase, counted separately for inbound and outbound connections.
// Zero defaults to preset values.
Expand Down

0 comments on commit eaea77a

Please sign in to comment.