diff --git a/pkg/topology/kademlia/export_test.go b/pkg/topology/kademlia/export_test.go index e890c3e6768..40cfd156bce 100644 --- a/pkg/topology/kademlia/export_test.go +++ b/pkg/topology/kademlia/export_test.go @@ -9,5 +9,5 @@ var ( QuickSaturationPeers = &quickSaturationPeers SaturationPeers = &saturationPeers OverSaturationPeers = &overSaturationPeers - BootnodeOverSaturationPeers = &bootnodeOverSaturationPeers + BootnodeOverSaturationPeers = &bootNodeOverSaturationPeers ) diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index ed40476eb8c..e16bc54b317 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -31,8 +31,8 @@ import ( const ( nnLowWatermark = 2 // the number of peers in consecutive deepest bins that constitute as nearest neighbours - maxConnAttempts = 3 // when there is maxConnAttempts failed connect calls for a given peer it is considered non-connectable - maxBootnodeAttempts = 3 // how many attempts to dial to bootnodes before giving up + maxConnAttempts = 1 // when there is maxConnAttempts failed connect calls for a given peer it is considered non-connectable + maxBootNodeAttempts = 3 // how many attempts to dial to boot-nodes before giving up defaultBitSuffixLength = 3 // the number of bits used to create pseudo addresses for balancing peerConnectionAttemptTimeout = 5 * time.Second // Timeout for establishing a new connection with peer. @@ -42,7 +42,7 @@ var ( quickSaturationPeers = 4 saturationPeers = 8 overSaturationPeers = 20 - bootnodeOverSaturationPeers = 20 + bootNodeOverSaturationPeers = 20 shortRetry = 30 * time.Second timeToRetry = 2 * shortRetry broadcastBinSize = 4 @@ -111,7 +111,7 @@ func New( if o.SaturationFunc == nil { os := overSaturationPeers if o.BootnodeMode { - os = bootnodeOverSaturationPeers + os = bootNodeOverSaturationPeers } o.SaturationFunc = binSaturated(os) } @@ -245,6 +245,9 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI } for i := range k.commonBinPrefixes { + if i < int(k.NeighborhoodDepth()) { + continue + } for j := range k.commonBinPrefixes[i] { pseudoAddr := k.commonBinPrefixes[i][j] @@ -299,14 +302,50 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI // connectNeighbours attempts to connect to the neighbours // which were not considered by the connectBalanced method. func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerConnInfo) { - // The topology.EachPeerFunc doesn't return an error - // so we ignore the error returned from EachBinRev. + const multiplePeerThreshold = 4 + sent := 0 _ = k.knownPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) { + depth := k.NeighborhoodDepth() + + if depth > po || po >= depth+multiplePeerThreshold { + return false, true, nil + } + if k.connectedPeers.Exists(addr) { + return false, false, nil + } + + if k.waitNext.Waiting(addr) { + k.metrics.TotalBeforeExpireWaits.Inc() + return false, false, nil + } + + select { + case <-k.quit: + return true, false, nil + default: + wg.Add(1) + peerConnChan <- &peerConnInfo{ + po: po, + addr: addr, + } + sent++ + } + + // We want to sent number of attempts equal to saturationPeers + // in order to speed up the topology build. + next := sent == saturationPeers + if next { + sent = 0 + } + return false, next, nil + }) + + _ = k.knownPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) { depth := k.NeighborhoodDepth() - if po < depth { + if po < depth+multiplePeerThreshold { return false, true, nil } @@ -484,8 +523,8 @@ func (k *Kad) manage() { } oldDepth := k.NeighborhoodDepth() - k.connectBalanced(&wg, peerConnChan) k.connectNeighbours(&wg, peerConnChan) + k.connectBalanced(&wg, peerConnChan) wg.Wait() k.depthMu.Lock() @@ -512,13 +551,13 @@ func (k *Kad) manage() { default: } k.logger.Debug("kademlia: no connected peers, trying bootnodes") - k.connectBootnodes(ctx) + k.connectBootNodes(ctx) } } } } -func (k *Kad) Start(ctx context.Context) error { +func (k *Kad) Start(_ context.Context) error { k.wg.Add(1) go k.manage() @@ -531,9 +570,9 @@ func (k *Kad) Start(ctx context.Context) error { return nil } -func (k *Kad) connectBootnodes(ctx context.Context) { +func (k *Kad) connectBootNodes(ctx context.Context) { var attempts, connected int - var totalAttempts = maxBootnodeAttempts * len(k.bootnodes) + var totalAttempts = maxBootNodeAttempts * len(k.bootnodes) ctx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() @@ -545,7 +584,7 @@ func (k *Kad) connectBootnodes(ctx context.Context) { if _, err := p2p.Discover(ctx, addr, func(addr ma.Multiaddr) (stop bool, err error) { k.logger.Tracef("connecting to bootnode %s", addr) - if attempts >= maxBootnodeAttempts { + if attempts >= maxBootNodeAttempts { return true, nil } bzzAddress, err := k.p2p.Connect(ctx, addr) @@ -710,8 +749,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) k.collector.Inspect(peer, func(ss *im.Snapshot) { quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt() - - if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts { + if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts >= maxConnAttempts { k.waitNext.Remove(peer) k.knownPeers.Remove(peer) if err := k.addressBook.Remove(peer); err != nil { @@ -736,7 +774,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr) // Announce a newly connected peer to our connected peers, but also // notify the peer about our already connected peers func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) error { - addrs := []swarm.Address{} + var addrs []swarm.Address for bin := uint8(0); bin < swarm.MaxBins; bin++ { @@ -805,30 +843,22 @@ func (k *Kad) Pick(peer p2p.Peer) bool { // Connected is called when a peer has dialed in. func (k *Kad) Connected(ctx context.Context, peer p2p.Peer) error { - address := peer.Address po := swarm.Proximity(k.base.Bytes(), address.Bytes()) if _, overSaturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers); overSaturated { - if k.bootnode { randPeer, err := k.randomPeer(po) if err != nil { return err } _ = k.p2p.Disconnect(randPeer) - goto connected + return k.connected(ctx, address) } - return topology.ErrOversaturated } -connected: - if err := k.connected(ctx, address); err != nil { - return err - } - k.notifyManageLoop() - return nil + return k.connected(ctx, address) } func (k *Kad) connected(ctx context.Context, addr swarm.Address) error { @@ -848,6 +878,7 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error { k.depth = recalcDepth(k.connectedPeers, k.radius) k.depthMu.Unlock() + k.notifyManageLoop() k.notifyPeerSig() return nil @@ -1009,7 +1040,7 @@ func (k *Kad) IsWithinDepth(addr swarm.Address) bool { return swarm.Proximity(k.base.Bytes(), addr.Bytes()) >= k.NeighborhoodDepth() } -// // EachNeighbor iterates from closest bin to farthest of the neighborhood peers. +// EachNeighbor iterates from closest bin to farthest of the neighborhood peers. func (k *Kad) EachNeighbor(f topology.EachPeerFunc) error { depth := k.NeighborhoodDepth() fn := func(a swarm.Address, po uint8) (bool, bool, error) { diff --git a/pkg/topology/kademlia/kademlia_test.go b/pkg/topology/kademlia/kademlia_test.go index 785e1466c49..77b120298f0 100644 --- a/pkg/topology/kademlia/kademlia_test.go +++ b/pkg/topology/kademlia/kademlia_test.go @@ -746,13 +746,10 @@ func TestAddressBookPrune(t *testing.T) { waitCounter(t, &conns, 0) waitCounter(t, &failedConns, 1) - p, err := ab.Get(nonConnPeer.Overlay) - if err != nil { + _, err = ab.Get(nonConnPeer.Overlay) + if err != addressbook.ErrNotFound { t.Fatal(err) } - if !nonConnPeer.Equal(p) { - t.Fatalf("expected %+v, got %+v", nonConnPeer, p) - } addr := test.RandomAddressAt(base, 1) addr1 := test.RandomAddressAt(base, 1) @@ -761,37 +758,29 @@ func TestAddressBookPrune(t *testing.T) { // add one valid peer to initiate the retry, check connection and failed connection counters addOne(t, signer, kad, ab, addr) waitCounter(t, &conns, 1) - waitCounter(t, &failedConns, 1) + waitCounter(t, &failedConns, 0) - p, err = ab.Get(nonConnPeer.Overlay) - if err != nil { + _, err = ab.Get(nonConnPeer.Overlay) + if err != addressbook.ErrNotFound { t.Fatal(err) } - if !nonConnPeer.Equal(p) { - t.Fatalf("expected %+v, got %+v", nonConnPeer, p) - } - time.Sleep(50 * time.Millisecond) // add one valid peer to initiate the retry, check connection and failed connection counters addOne(t, signer, kad, ab, addr1) waitCounter(t, &conns, 1) - waitCounter(t, &failedConns, 1) + waitCounter(t, &failedConns, 0) - p, err = ab.Get(nonConnPeer.Overlay) - if err != nil { + _, err = ab.Get(nonConnPeer.Overlay) + if err != addressbook.ErrNotFound { t.Fatal(err) } - if !nonConnPeer.Equal(p) { - t.Fatalf("expected %+v, got %+v", nonConnPeer, p) - } - time.Sleep(50 * time.Millisecond) // add one valid peer to initiate the retry, check connection and failed connection counters addOne(t, signer, kad, ab, addr2) waitCounter(t, &conns, 1) - waitCounter(t, &failedConns, 1) + waitCounter(t, &failedConns, 0) _, err = ab.Get(nonConnPeer.Overlay) if err != addressbook.ErrNotFound {