Skip to content

Commit

Permalink
refactor(kademlia): speeding up topology build (#2028)
Browse files Browse the repository at this point in the history
Speeds up the topology build by dividing the connect neighbor
method into two separate phases executed in sequential order.
The first phase makes a number of concurrent connections not
bigger than what is considered for the bin to be saturated. The
second phase takes care of the rest of the connection in a
regular manner.
  • Loading branch information
Peter Mrekaj committed Jun 10, 2021
1 parent 7383d3d commit 388256b
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 77 deletions.
2 changes: 1 addition & 1 deletion pkg/topology/kademlia/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ var (
QuickSaturationPeers = &quickSaturationPeers
SaturationPeers = &saturationPeers
OverSaturationPeers = &overSaturationPeers
BootnodeOverSaturationPeers = &bootnodeOverSaturationPeers
BootnodeOverSaturationPeers = &bootNodeOverSaturationPeers
)
150 changes: 95 additions & 55 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (

const (
nnLowWatermark = 2 // the number of peers in consecutive deepest bins that constitute as nearest neighbours
maxConnAttempts = 3 // when there is maxConnAttempts failed connect calls for a given peer it is considered non-connectable
maxBootnodeAttempts = 3 // how many attempts to dial to bootnodes before giving up
maxConnAttempts = 1 // when there is maxConnAttempts failed connect calls for a given peer it is considered non-connectable
maxBootNodeAttempts = 3 // how many attempts to dial to boot-nodes before giving up
defaultBitSuffixLength = 3 // the number of bits used to create pseudo addresses for balancing

peerConnectionAttemptTimeout = 5 * time.Second // Timeout for establishing a new connection with peer.
Expand All @@ -42,7 +42,7 @@ var (
quickSaturationPeers = 4
saturationPeers = 8
overSaturationPeers = 20
bootnodeOverSaturationPeers = 20
bootNodeOverSaturationPeers = 20
shortRetry = 30 * time.Second
timeToRetry = 2 * shortRetry
broadcastBinSize = 4
Expand Down Expand Up @@ -111,7 +111,7 @@ func New(
if o.SaturationFunc == nil {
os := overSaturationPeers
if o.BootnodeMode {
os = bootnodeOverSaturationPeers
os = bootNodeOverSaturationPeers
}
o.SaturationFunc = binSaturated(os)
}
Expand Down Expand Up @@ -245,6 +245,9 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI
}

for i := range k.commonBinPrefixes {
if i >= int(k.NeighborhoodDepth()) {
continue
}
for j := range k.commonBinPrefixes[i] {
pseudoAddr := k.commonBinPrefixes[i][j]

Expand Down Expand Up @@ -298,15 +301,18 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI

// connectNeighbours attempts to connect to the neighbours
// which were not considered by the connectBalanced method.
func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerConnInfo) {
// The topology.EachPeerFunc doesn't return an error
// so we ignore the error returned from EachBinRev.
func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan, peerConnChan2 chan<- *peerConnInfo) {
const multiplePeerThreshold = 8

sent := 0
_ = k.knownPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) {

depth := k.NeighborhoodDepth()

if po < depth {
if depth > po || po >= depth+multiplePeerThreshold {
return false, true, nil
}

if len(k.connectedPeers.BinPeers(po)) >= overSaturationPeers-1 {
return false, true, nil
}

Expand All @@ -328,6 +334,43 @@ func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerCon
po: po,
addr: addr,
}
sent++
}

// We want to sent number of attempts equal to saturationPeers
// in order to speed up the topology build.
next := sent == saturationPeers
if next {
sent = 0
}
return false, next, nil
})

_ = k.knownPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) {
depth := k.NeighborhoodDepth()

if po < depth+multiplePeerThreshold {
return false, true, nil
}

if k.connectedPeers.Exists(addr) {
return false, false, nil
}

if k.waitNext.Waiting(addr) {
k.metrics.TotalBeforeExpireWaits.Inc()
return false, false, nil
}

select {
case <-k.quit:
return true, false, nil
default:
wg.Add(1)
peerConnChan2 <- &peerConnInfo{
po: po,
addr: addr,
}
}

// The bin could be saturated or not, so a decision cannot
Expand All @@ -338,7 +381,7 @@ func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerCon

// connectionAttemptsHandler handles the connection attempts
// to peers sent by the producers to the peerConnChan.
func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, peerConnChan <-chan *peerConnInfo) {
func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup, peerConnChan, peerConnChan2 <-chan *peerConnInfo) {
connect := func(peer *peerConnInfo) {
bzzAddr, err := k.addressBook.Get(peer.addr)
switch {
Expand Down Expand Up @@ -396,34 +439,38 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
inProgress = make(map[string]bool)
inProgressMu sync.Mutex
)
for i := 0; i < int(swarm.MaxBins); i++ {
go func() {
for {
select {
case <-k.quit:
return
case peer := <-peerConnChan:
addr := peer.addr.String()
connAttempt := func(peerConnChan <-chan *peerConnInfo) {
for {
select {
case <-k.quit:
return
case peer := <-peerConnChan:
addr := peer.addr.String()

if k.waitNext.Waiting(peer.addr) {
k.metrics.TotalBeforeExpireWaits.Inc()
wg.Done()
continue
}
if k.waitNext.Waiting(peer.addr) {
k.metrics.TotalBeforeExpireWaits.Inc()
wg.Done()
continue
}

inProgressMu.Lock()
if !inProgress[addr] {
inProgress[addr] = true
inProgressMu.Unlock()
connect(peer)
inProgressMu.Lock()
delete(inProgress, addr)
}
inProgressMu.Lock()
if !inProgress[addr] {
inProgress[addr] = true
inProgressMu.Unlock()
wg.Done()
connect(peer)
inProgressMu.Lock()
delete(inProgress, addr)
}
inProgressMu.Unlock()
wg.Done()
}
}()
}
}
for i := 0; i < 64; i++ {
go connAttempt(peerConnChan)
}
for i := 0; i < 8; i++ {
go connAttempt(peerConnChan2)
}
}

Expand Down Expand Up @@ -452,7 +499,8 @@ func (k *Kad) manage() {
// spun up by goroutines, to finish before we try the boot-nodes.
var wg sync.WaitGroup
var peerConnChan = make(chan *peerConnInfo)
go k.connectionAttemptsHandler(ctx, &wg, peerConnChan)
var peerConnChan2 = make(chan *peerConnInfo)
go k.connectionAttemptsHandler(ctx, &wg, peerConnChan, peerConnChan2)

for {
select {
Expand Down Expand Up @@ -484,8 +532,8 @@ func (k *Kad) manage() {
}

oldDepth := k.NeighborhoodDepth()
k.connectBalanced(&wg, peerConnChan)
k.connectNeighbours(&wg, peerConnChan)
k.connectNeighbours(&wg, peerConnChan, peerConnChan2)
k.connectBalanced(&wg, peerConnChan2)
wg.Wait()

k.depthMu.Lock()
Expand All @@ -512,13 +560,13 @@ func (k *Kad) manage() {
default:
}
k.logger.Debug("kademlia: no connected peers, trying bootnodes")
k.connectBootnodes(ctx)
k.connectBootNodes(ctx)
}
}
}
}

func (k *Kad) Start(ctx context.Context) error {
func (k *Kad) Start(_ context.Context) error {
k.wg.Add(1)
go k.manage()

Expand All @@ -531,9 +579,9 @@ func (k *Kad) Start(ctx context.Context) error {
return nil
}

func (k *Kad) connectBootnodes(ctx context.Context) {
func (k *Kad) connectBootNodes(ctx context.Context) {
var attempts, connected int
var totalAttempts = maxBootnodeAttempts * len(k.bootnodes)
var totalAttempts = maxBootNodeAttempts * len(k.bootnodes)

ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
Expand All @@ -545,7 +593,7 @@ func (k *Kad) connectBootnodes(ctx context.Context) {

if _, err := p2p.Discover(ctx, addr, func(addr ma.Multiaddr) (stop bool, err error) {
k.logger.Tracef("connecting to bootnode %s", addr)
if attempts >= maxBootnodeAttempts {
if attempts >= maxBootNodeAttempts {
return true, nil
}
bzzAddress, err := k.p2p.Connect(ctx, addr)
Expand Down Expand Up @@ -710,8 +758,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)

k.collector.Inspect(peer, func(ss *im.Snapshot) {
quickPrune := ss == nil || ss.HasAtMaxOneConnectionAttempt()

if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts > maxConnAttempts {
if (k.connectedPeers.Length() > 0 && quickPrune) || failedAttempts >= maxConnAttempts {
k.waitNext.Remove(peer)
k.knownPeers.Remove(peer)
if err := k.addressBook.Remove(peer); err != nil {
Expand All @@ -736,7 +783,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
// Announce a newly connected peer to our connected peers, but also
// notify the peer about our already connected peers
func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) error {
addrs := []swarm.Address{}
var addrs []swarm.Address

for bin := uint8(0); bin < swarm.MaxBins; bin++ {

Expand Down Expand Up @@ -805,30 +852,22 @@ func (k *Kad) Pick(peer p2p.Peer) bool {

// Connected is called when a peer has dialed in.
func (k *Kad) Connected(ctx context.Context, peer p2p.Peer) error {

address := peer.Address
po := swarm.Proximity(k.base.Bytes(), address.Bytes())

if _, overSaturated := k.saturationFunc(po, k.knownPeers, k.connectedPeers); overSaturated {

if k.bootnode {
randPeer, err := k.randomPeer(po)
if err != nil {
return err
}
_ = k.p2p.Disconnect(randPeer)
goto connected
return k.connected(ctx, address)
}

return topology.ErrOversaturated
}

connected:
if err := k.connected(ctx, address); err != nil {
return err
}
k.notifyManageLoop()
return nil
return k.connected(ctx, address)
}

func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
Expand All @@ -848,6 +887,7 @@ func (k *Kad) connected(ctx context.Context, addr swarm.Address) error {
k.depth = recalcDepth(k.connectedPeers, k.radius)
k.depthMu.Unlock()

k.notifyManageLoop()
k.notifyPeerSig()
return nil

Expand Down Expand Up @@ -1009,7 +1049,7 @@ func (k *Kad) IsWithinDepth(addr swarm.Address) bool {
return swarm.Proximity(k.base.Bytes(), addr.Bytes()) >= k.NeighborhoodDepth()
}

// // EachNeighbor iterates from closest bin to farthest of the neighborhood peers.
// EachNeighbor iterates from closest bin to farthest of the neighborhood peers.
func (k *Kad) EachNeighbor(f topology.EachPeerFunc) error {
depth := k.NeighborhoodDepth()
fn := func(a swarm.Address, po uint8) (bool, bool, error) {
Expand Down
31 changes: 10 additions & 21 deletions pkg/topology/kademlia/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestNeighborhoodDepth(t *testing.T) {
kDepth(t, kad, i)
}

// add a whole bunch of peers in bin 15, expect depth to stay at 15
// add a whole bunch of peers in the last bin, expect depth to stay at 31
for i := 0; i < 15; i++ {
addr = test.RandomAddressAt(base, int(swarm.MaxPO))
addOne(t, signer, kad, ab, addr)
Expand Down Expand Up @@ -746,13 +746,10 @@ func TestAddressBookPrune(t *testing.T) {
waitCounter(t, &conns, 0)
waitCounter(t, &failedConns, 1)

p, err := ab.Get(nonConnPeer.Overlay)
if err != nil {
_, err = ab.Get(nonConnPeer.Overlay)
if err != addressbook.ErrNotFound {
t.Fatal(err)
}
if !nonConnPeer.Equal(p) {
t.Fatalf("expected %+v, got %+v", nonConnPeer, p)
}

addr := test.RandomAddressAt(base, 1)
addr1 := test.RandomAddressAt(base, 1)
Expand All @@ -761,37 +758,29 @@ func TestAddressBookPrune(t *testing.T) {
// add one valid peer to initiate the retry, check connection and failed connection counters
addOne(t, signer, kad, ab, addr)
waitCounter(t, &conns, 1)
waitCounter(t, &failedConns, 1)
waitCounter(t, &failedConns, 0)

p, err = ab.Get(nonConnPeer.Overlay)
if err != nil {
_, err = ab.Get(nonConnPeer.Overlay)
if err != addressbook.ErrNotFound {
t.Fatal(err)
}

if !nonConnPeer.Equal(p) {
t.Fatalf("expected %+v, got %+v", nonConnPeer, p)
}

time.Sleep(50 * time.Millisecond)
// add one valid peer to initiate the retry, check connection and failed connection counters
addOne(t, signer, kad, ab, addr1)
waitCounter(t, &conns, 1)
waitCounter(t, &failedConns, 1)
waitCounter(t, &failedConns, 0)

p, err = ab.Get(nonConnPeer.Overlay)
if err != nil {
_, err = ab.Get(nonConnPeer.Overlay)
if err != addressbook.ErrNotFound {
t.Fatal(err)
}

if !nonConnPeer.Equal(p) {
t.Fatalf("expected %+v, got %+v", nonConnPeer, p)
}

time.Sleep(50 * time.Millisecond)
// add one valid peer to initiate the retry, check connection and failed connection counters
addOne(t, signer, kad, ab, addr2)
waitCounter(t, &conns, 1)
waitCounter(t, &failedConns, 1)
waitCounter(t, &failedConns, 0)

_, err = ab.Get(nonConnPeer.Overlay)
if err != addressbook.ErrNotFound {
Expand Down

0 comments on commit 388256b

Please sign in to comment.