Skip to content

Commit

Permalink
fix(bootstrap): parallelize bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton authored and 90dy committed Mar 8, 2019
1 parent 4f57117 commit fa247b8
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 42 deletions.
41 changes: 15 additions & 26 deletions core/network/driver.go
Expand Up @@ -98,42 +98,31 @@ func (net *Network) Peerstore(ctx context.Context) pstore.Peerstore {
return net.host.Peerstore()
}

func (net *Network) Bootstrap(ctx context.Context, sync bool, addrs ...string) error {
tracer := tracing.EnterFunc(ctx, sync, addrs)
func (net *Network) Bootstrap(ctx context.Context, bsync bool, addrs ...string) error {
tracer := tracing.EnterFunc(ctx, bsync, addrs)
defer tracer.Finish()
ctx = tracer.Context()

bf := net.BootstrapPeerAsync
if sync {
bf = net.BootstrapPeer
}
wg := sync.WaitGroup{}

var err error
for _, addr := range addrs {
err = nil
if err = bf(ctx, addr); err != nil {
logger().Error(err.Error())
continue
}
break
}
if err != nil {
return err
go func() {
wg.Add(1)
if err = net.BootstrapPeer(ctx, addr); err != nil {
logger().Error("bootstrap", zap.Error(err))
}
wg.Done()
}()
}

return nil
}

func (net *Network) BootstrapPeerAsync(ctx context.Context, addr string) error {
tracer := tracing.EnterFunc(ctx, addr)
defer tracer.Finish()
ctx = tracer.Context()

go func() {
if err := net.BootstrapPeer(ctx, addr); err != nil {
logger().Warn("Bootstrap error", zap.String("addr", addr), zap.Error(err))
if bsync {
wg.Wait()
if len(net.host.Peerstore().Peers()) == 0 {
return fmt.Errorf("bootstrap failed")
}
}()
}

return nil
}
Expand Down
20 changes: 19 additions & 1 deletion core/network/host/routing.go
Expand Up @@ -21,6 +21,12 @@ import (
"go.uber.org/zap"
)

const (
maxDelay = 5.0 * time.Second
baseDelay = 500.0 * time.Millisecond
factor = 1.2
)

var _ routing.IpfsRouting = (*BertyRouting)(nil)
var _ inet.Notifiee = (*BertyRouting)(nil)

Expand Down Expand Up @@ -149,6 +155,15 @@ func (br *BertyRouting) waitIsReady(ctx context.Context) {
}
}

func (br *BertyRouting) getBackoffDelay(currentDelay time.Duration) time.Duration {
delay := float64(currentDelay) * factor
if delay > float64(maxDelay) {
return maxDelay
}

return time.Duration(delay)
}

func (br *BertyRouting) Listen(net inet.Network, a ma.Multiaddr) {}
func (br *BertyRouting) ListenClose(net inet.Network, a ma.Multiaddr) {}
func (br *BertyRouting) OpenedStream(net inet.Network, s inet.Stream) {}
Expand All @@ -168,15 +183,18 @@ func (br *BertyRouting) Connected(net inet.Network, c inet.Conn) {
defer br.muReady.Unlock()

if !br.ready {
delay := baseDelay
for len(br.dht.RoutingTable().ListPeers()) == 0 {
if len(net.Conns()) == 0 {
logger().Warn("no conns available, aborting")
return
}

delay = br.getBackoffDelay(delay)

// try again until bucket is fill with at last
// one peer
<-time.After(500)
<-time.After(delay)
}

t := time.Since(br.tstart)
Expand Down
23 changes: 8 additions & 15 deletions core/network/test/network_test.go
Expand Up @@ -3,6 +3,7 @@ package test
import (
"context"
"testing"
"time"

"berty.tech/core/network"
"berty.tech/core/testrunner"
Expand All @@ -14,35 +15,27 @@ func init() {
}

func Test(t *testing.T) {
var (
rootContext = context.Background()
// TODO: Test to cancel context to check:
// https://github.com/libp2p/go-libp2p/blob/a4827ae9dda71d85f03fe6a5926194bfed2b2c71/libp2p.go#L52
)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

Convey("network without options", t, FailureHalts, func() {
ctx, cancel := context.WithCancel(rootContext)
net, err := network.New(ctx)
So(err, ShouldNotBeNil)
So(net, ShouldBeNil)
cancel()
So(err, ShouldBeNil)
So(net, ShouldNotBeNil)

})

Convey("network with libp2p", t, FailureHalts, func() {
Convey("without option", FailureHalts, func() {
ctx, cancel := context.WithCancel(rootContext)
net, err := network.New(ctx)
So(err, ShouldNotBeNil)
So(net, ShouldBeNil)
cancel()
So(err, ShouldBeNil)
So(net, ShouldNotBeNil)
})

Convey("with default options", FailureHalts, func() {
ctx, cancel := context.WithCancel(rootContext)
net, err := network.New(ctx, network.WithDefaultOptions())
So(err, ShouldBeNil)
So(net, ShouldNotBeNil)
cancel()
})
})
}

0 comments on commit fa247b8

Please sign in to comment.