Skip to content

Commit

Permalink
refactor(kademlia): speeding up topology build
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Mrekaj committed Jun 8, 2021
1 parent 1890f32 commit 4bee71b
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 48 deletions.
2 changes: 1 addition & 1 deletion pkg/topology/kademlia/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ var (
QuickSaturationPeers = &quickSaturationPeers
SaturationPeers = &saturationPeers
OverSaturationPeers = &overSaturationPeers
BootnodeOverSaturationPeers = &bootnodeOverSaturationPeers
BootnodeOverSaturationPeers = &bootNodeOverSaturationPeers
)
85 changes: 58 additions & 27 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (

const (
nnLowWatermark = 2 // the number of peers in consecutive deepest bins that constitute as nearest neighbours
maxConnAttempts = 3 // when there is maxConnAttempts failed connect calls for a given peer it is considered non-connectable
maxBootnodeAttempts = 3 // how many attempts to dial to bootnodes before giving up
maxConnAttempts = 1 // when there is maxConnAttempts failed connect calls for a given peer it is considered non-connectable
maxBootNodeAttempts = 3 // how many attempts to dial to boot-nodes before giving up
defaultBitSuffixLength = 3 // the number of bits used to create pseudo addresses for balancing

peerConnectionAttemptTimeout = 5 * time.Second // Timeout for establishing a new connection with peer.
Expand All @@ -42,7 +42,7 @@ var (
quickSaturationPeers = 4
saturationPeers = 8
overSaturationPeers = 20
bootnodeOverSaturationPeers = 20
bootNodeOverSaturationPeers = 20
shortRetry = 30 * time.Second
timeToRetry = 2 * shortRetry
broadcastBinSize = 4
Expand Down Expand Up @@ -111,7 +111,7 @@ func New(
if o.SaturationFunc == nil {
os := overSaturationPeers
if o.BootnodeMode {
os = bootnodeOverSaturationPeers
os = bootNodeOverSaturationPeers
}
o.SaturationFunc = binSaturated(os)
}
Expand Down Expand Up @@ -245,6 +245,9 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI
}

for i := range k.commonBinPrefixes {
if i < int(k.NeighborhoodDepth()) {
continue
}
for j := range k.commonBinPrefixes[i] {
pseudoAddr := k.commonBinPrefixes[i][j]

Expand Down Expand Up @@ -299,14 +302,50 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI
// connectNeighbours attempts to connect to the neighbours
// which were not considered by the connectBalanced method.
func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerConnInfo) {
// The topology.EachPeerFunc doesn't return an error
// so we ignore the error returned from EachBinRev.
const multiplePeerThreshold = 4

sent := 0
_ = k.knownPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) {
depth := k.NeighborhoodDepth()

if depth > po || po >= depth+multiplePeerThreshold {
return false, true, nil
}

if k.connectedPeers.Exists(addr) {
return false, false, nil
}

if k.waitNext.Waiting(addr) {
k.metrics.TotalBeforeExpireWaits.Inc()
return false, false, nil
}

select {
case <-k.quit:
return true, false, nil
default:
wg.Add(1)
peerConnChan <- &peerConnInfo{
po: po,
addr: addr,
}
sent++
}

// We want to sent number of attempts equal to saturationPeers
// in order to speed up the topology build.
next := sent == saturationPeers
if next {
sent = 0
}
return false, next, nil
})

_ = k.knownPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) {
depth := k.NeighborhoodDepth()

if po < depth {
if po < depth+multiplePeerThreshold {
return false, true, nil
}

Expand Down Expand Up @@ -484,8 +523,8 @@ func (k *Kad) manage() {
}

oldDepth := k.NeighborhoodDepth()
k.connectBalanced(&wg, peerConnChan)
k.connectNeighbours(&wg, peerConnChan)
k.connectBalanced(&wg, peerConnChan)
wg.Wait()

k.depthMu.Lock()
Expand All @@ -512,13 +551,13 @@ func (k *Kad) manage() {
default:
}
k.logger.Debug("kademlia: no connected peers, trying bootnodes")
k.connectBootnodes(ctx)
k.connectBootNodes(ctx)
}
}
}
}

func (k *Kad) Start(ctx context.Context) error {
func (k *Kad) Start(_ context.Context) error {
k.wg.Add(1)
go k.manage()

Expand All @@ -531,9 +570,9 @@ func (k *Kad) Start(ctx context.Context) error {
return nil
}

func (k *Kad) connectBootnodes(ctx context.Context) {
func (k *Kad) connectBootNodes(ctx context.Context) {
var attempts, connected int
var totalAttempts = maxBootnodeAttempts * len(k.bootnodes)
var totalAttempts = maxBootNodeAttempts * len(k.bootnodes)

ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
Expand All @@ -545,7 +584,7 @@ func (k *Kad) connectBootnodes(ctx context.Context) {

if _, err := p2p.Discover(ctx, addr, func(addr ma.Multiaddr) (stop bool, err error) {
k.logger.Tracef("connecting to bootnode %s", addr)
if attempts >= maxBootnodeAttempts {
if attempts >= maxBootNodeAttempts {
return true, nil
}
bzzAddress, err := k.p2p.Connect(ctx, addr)
Expand Down Expand Up @@ -710,8 +749,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)

k.collector.Inspect(peer, func(ss *im.Snapshot) {
quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt()

if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts {
if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts >= maxConnAttempts {
k.waitNext.Remove(peer)
k.knownPeers.Remove(peer)
if err := k.addressBook.Remove(peer); err != nil {
Expand All @@ -736,7 +774,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
// Announce a newly connected peer to our connected peers, but also
// notify the peer about our already connected peers
func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) error {
addrs := []swarm.Address{}
var addrs []swarm.Address

for bin := uint8(0); bin < swarm.MaxBins; bin++ {

Expand Down Expand Up @@ -805,30 +843,22 @@ func (k *Kad) Pick(peer p2p.Peer) bool {

// Connected is called when a peer has dialed in.
func (k *Kad) Connected(ctx context.Context, peer p2p.Peer) error {

address := peer.Address
po := swarm.Proximity(k.base.Bytes(), address.Bytes())

if _, overSaturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers); overSaturated {

if k.bootnode {
randPeer, err := k.randomPeer(po)
if err != nil {
return err
}
_ = k.p2p.Disconnect(randPeer)
goto connected
return k.connected(ctx, address)
}

return topology.ErrOversaturated
}

connected:
if err := k.connected(ctx, address); err != nil {
return err
}
k.notifyManageLoop()
return nil
return k.connected(ctx, address)
}

func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
Expand All @@ -848,6 +878,7 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock()

k.notifyManageLoop()
k.notifyPeerSig()
return nil

Expand Down Expand Up @@ -1009,7 +1040,7 @@ func (k *Kad) IsWithinDepth(addr swarm.Address) bool {
return swarm.Proximity(k.base.Bytes(), addr.Bytes()) >= k.NeighborhoodDepth()
}

// // EachNeighbor iterates from closest bin to farthest of the neighborhood peers.
// EachNeighbor iterates from closest bin to farthest of the neighborhood peers.
func (k *Kad) EachNeighbor(f topology.EachPeerFunc) error {
depth := k.NeighborhoodDepth()
fn := func(a swarm.Address, po uint8) (bool, bool, error) {
Expand Down
29 changes: 9 additions & 20 deletions pkg/topology/kademlia/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,13 +746,10 @@ func TestAddressBookPrune(t *testing.T) {
waitCounter(t, &conns, 0)
waitCounter(t, &failedConns, 1)

p, err := ab.Get(nonConnPeer.Overlay)
if err != nil {
_, err = ab.Get(nonConnPeer.Overlay)
if err != addressbook.ErrNotFound {
t.Fatal(err)
}
if !nonConnPeer.Equal(p) {
t.Fatalf("expected %+v, got %+v", nonConnPeer, p)
}

addr := test.RandomAddressAt(base, 1)
addr1 := test.RandomAddressAt(base, 1)
Expand All @@ -761,37 +758,29 @@ func TestAddressBookPrune(t *testing.T) {
// add one valid peer to initiate the retry, check connection and failed connection counters
addOne(t, signer, kad, ab, addr)
waitCounter(t, &conns, 1)
waitCounter(t, &failedConns, 1)
waitCounter(t, &failedConns, 0)

p, err = ab.Get(nonConnPeer.Overlay)
if err != nil {
_, err = ab.Get(nonConnPeer.Overlay)
if err != addressbook.ErrNotFound {
t.Fatal(err)
}

if !nonConnPeer.Equal(p) {
t.Fatalf("expected %+v, got %+v", nonConnPeer, p)
}

time.Sleep(50 * time.Millisecond)
// add one valid peer to initiate the retry, check connection and failed connection counters
addOne(t, signer, kad, ab, addr1)
waitCounter(t, &conns, 1)
waitCounter(t, &failedConns, 1)
waitCounter(t, &failedConns, 0)

p, err = ab.Get(nonConnPeer.Overlay)
if err != nil {
_, err = ab.Get(nonConnPeer.Overlay)
if err != addressbook.ErrNotFound {
t.Fatal(err)
}

if !nonConnPeer.Equal(p) {
t.Fatalf("expected %+v, got %+v", nonConnPeer, p)
}

time.Sleep(50 * time.Millisecond)
// add one valid peer to initiate the retry, check connection and failed connection counters
addOne(t, signer, kad, ab, addr2)
waitCounter(t, &conns, 1)
waitCounter(t, &failedConns, 1)
waitCounter(t, &failedConns, 0)

_, err = ab.Get(nonConnPeer.Overlay)
if err != addressbook.ErrNotFound {
Expand Down

0 comments on commit 4bee71b

Please sign in to comment.