Skip to content

Commit

Permalink
feat(network): Add backoff delay logic
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton committed Mar 20, 2019
1 parent 160b4c1 commit 63820f2
Show file tree
Hide file tree
Showing 14 changed files with 177 additions and 118 deletions.
2 changes: 1 addition & 1 deletion client/react-native/desktop/main.go
Expand Up @@ -9,7 +9,7 @@ import (
"berty.tech/core/pkg/logmanager"
"go.uber.org/zap"

"github.com/asticode/go-astilectron"
astilectron "github.com/asticode/go-astilectron"
bootstrap "github.com/asticode/go-astilectron-bootstrap"
)

Expand Down
1 change: 1 addition & 0 deletions core/api/client/jsonclient/logger.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/api/client/logger.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/cmd/berty/root.go
Expand Up @@ -56,7 +56,7 @@ func newRootCommand() *cobra.Command {
defaultJaegerName := os.Getenv("USER") + "@" + os.Getenv("HOST")
cmd.PersistentFlags().BoolP("help", "h", false, "Print usage")
cmd.PersistentFlags().StringP("log-level", "", "info", "log level (debug, info, warn, error)")
cmd.PersistentFlags().StringP("log-namespaces", "", "core.*,vendor.gorm*", "logger namespaces to enable (supports wildcard)")
cmd.PersistentFlags().StringP("log-namespaces", "", "core.*", "logger namespaces to enable (supports wildcard)")
cmd.PersistentFlags().StringP("log-dir", "", "/tmp/berty-logs", "local log files directory")
cmd.PersistentFlags().StringP("jaeger-address", "", "127.0.0.1:6831", "ip address / hostname and port of jaeger-agent: <hostname>:<port>")
cmd.PersistentFlags().StringP("jaeger-name", "", defaultJaegerName, "tracer name")
Expand Down
2 changes: 1 addition & 1 deletion core/network/driver.go
Expand Up @@ -254,7 +254,6 @@ func (net *Network) SendTo(ctx context.Context, pi pstore.PeerInfo, e *entity.En
}

func (net *Network) handleEnvelope(s inet.Stream) {
logger().Debug("receiving envelope")
if net.handler == nil {
logger().Error("handler is not set")
return
Expand All @@ -274,6 +273,7 @@ func (net *Network) handleEnvelope(s inet.Stream) {
return
}

logger().Debug("receiving envelope")
// @TODO: get opentracing context
net.handler(context.Background(), e)
}
Expand Down
53 changes: 53 additions & 0 deletions core/network/host/backoff.go
@@ -0,0 +1,53 @@
package host

import (
"math/rand"
"time"
)

const (
Factor = 1.6

// Randomize backoff delays so that if requests start at
// the same time, they won't operate in lockstep.
Jitter = 0.2
)

var rsource rand.Source

func init() {
rsource = rand.NewSource(time.Now().UnixNano())
}

type BackoffDelay struct {
rand *rand.Rand
baseDelay time.Duration
maxDelay time.Duration
}

func NewBackoffDelay(baseDelay, maxDelay time.Duration) *BackoffDelay {
return &BackoffDelay{
rand: rand.New(rsource),
baseDelay: baseDelay,
maxDelay: maxDelay,
}
}

func (b *BackoffDelay) Backoff(retries int) time.Duration {
if retries == 0 {
return b.baseDelay
}

backoff, max := float64(b.baseDelay), float64(b.maxDelay)
for backoff < max && retries > 0 {
backoff *= Factor
retries--
}

backoff *= 1 + Jitter*(b.rand.Float64()*2-1)
if backoff < 0 {
return 0
}

return time.Duration(backoff)
}
151 changes: 55 additions & 96 deletions core/network/host/connmanager.go
Expand Up @@ -30,6 +30,33 @@ func NewBertyConnMgr(ctx context.Context, low, hi int, grace time.Duration) *Ber
return cm
}

func (cm *BertyConnMgr) reconnect(net inet.Network, pid peer.ID, delay *BackoffDelay) {
retries := 0
for {
select {
case <-time.After(delay.Backoff(retries)):
case <-cm.ctx.Done():
logger().Error("connmanager", zap.Error(cm.ctx.Err()))
return
}

if net.Connectedness(pid) == inet.Connected {
return
}

logger().Debug("connmanager: try to reconnect", zap.String("id", pid.Pretty()), zap.Int("retries", retries))
_, err := net.DialPeer(cm.ctx, pid)
if err == nil {
return
}

logger().Debug("connmanager: cannot reconnect", zap.String("id", pid.Pretty()), zap.Error(err))

// update retries
retries++
}
}

func (cm *BertyConnMgr) TrimOpenConns(ctx context.Context) {
cm.BasicConnMgr.TrimOpenConns(ctx)
}
Expand All @@ -50,6 +77,19 @@ func (cm *BertyConnMgr) GetInfo() connmgr.CMInfo {
return cm.BasicConnMgr.GetInfo()
}

func (cm *BertyConnMgr) getRelayPeers() []pstore.PeerInfo {
fmt.Printf("not implemented")
return nil
}
func (cm *BertyConnMgr) getBootstrapPeers() []pstore.PeerInfo {
fmt.Printf("not implemented")
return nil
}
func (cm *BertyConnMgr) getKbucketPeers() []pstore.PeerInfo {
fmt.Printf("not implemented")
return nil
}

func (cm *BertyConnMgr) Notifee() inet.Notifiee {
return cm
}
Expand All @@ -60,104 +100,23 @@ func (cm *BertyConnMgr) Connected(net inet.Network, c inet.Conn) {
}

func (cm *BertyConnMgr) Disconnected(net inet.Network, c inet.Conn) {
// check if it a relay conn and try to reconnect
tagInfo := cm.GetTagInfo(c.RemotePeer())
peerID := c.RemotePeer()

// TODO: reconnect to kbucket && bootstrap && relay if list of each are < 1
logger().Debug("Disconnected", zap.String("tagInfo", fmt.Sprintf("%+v", tagInfo.Tags)))
if v, ok := tagInfo.Tags["kbucket"]; ok && v == 5 {
go func() {
for {
logger().Debug(
"connmanager: try to reconnect to kbucket",
zap.String("id", peerID.Pretty()),
)
if _, err := net.DialPeer(cm.ctx, peerID); err != nil {
logger().Debug(
"connmanager: cannot reconnect to kbucket",
zap.String("id", peerID.Pretty()),
zap.String("err", err.Error()),
)
select {
case <-time.After(time.Second * 10):
continue
case <-cm.ctx.Done():
cm.BasicConnMgr.Notifee().Disconnected(net, c)
return
}
}
break
}
}()
return
} else if v, ok := tagInfo.Tags["bootstrap"]; ok && v == 2 {
go func() {
for {
logger().Debug(
"connmanager: try to reconnect to bootstrap",
zap.String("id", peerID.Pretty()),
)
if _, err := net.DialPeer(cm.ctx, peerID); err != nil {
logger().Debug(
"connmanager: cannot reconnect to bootstrap",
zap.String("id", peerID.Pretty()),
zap.String("err", err.Error()),
)
select {
case <-time.After(time.Second * 1):
continue
case <-cm.ctx.Done():
cm.BasicConnMgr.Notifee().Disconnected(net, c)
return
}
}
break
}
}()
return
} else if v, ok := tagInfo.Tags["relay-hop"]; ok && v == 2 {
go func() {
for {
logger().Debug(
"connmanager: try to reconnect to relay",
zap.String("id", peerID.Pretty()),
)
if _, err := net.DialPeer(cm.ctx, peerID); err != nil {
logger().Debug(
"connmanager: cannot reconnect to relay",
zap.String("id", peerID.Pretty()),
zap.String("err", err.Error()),
)
select {
case <-time.After(time.Second * 10):
continue
case <-cm.ctx.Done():
cm.BasicConnMgr.Notifee().Disconnected(net, c)
return
}
}
break
}
}()
return
} else {
cm.BasicConnMgr.Notifee().Disconnected(net, c)
if net.Connectedness(c.RemotePeer()) != inet.Connected {
// check if it a relay conn and try to reconnect
peerID := c.RemotePeer()
tagInfo := cm.GetTagInfo(peerID)

var delay *BackoffDelay

if v, ok := tagInfo.Tags["bootstrap"]; ok && v == 2 {
delay = NewBackoffDelay(time.Second, time.Second*10)
go cm.reconnect(net, peerID, delay)
} else if v, ok := tagInfo.Tags["relay-hop"]; ok && v == 2 {
delay = NewBackoffDelay(time.Second, time.Minute)
go cm.reconnect(net, peerID, delay)
}
}
}

// TODO
func (cm *BertyConnMgr) getRelayPeers() []pstore.PeerInfo {
fmt.Printf("not implemented")
return nil
}
func (cm *BertyConnMgr) getBootstrapPeers() []pstore.PeerInfo {
fmt.Printf("not implemented")
return nil
}
func (cm *BertyConnMgr) getKbucketPeers() []pstore.PeerInfo {
fmt.Printf("not implemented")
return nil
cm.BasicConnMgr.Notifee().Disconnected(net, c)
}

// Listen is no-op in this implementation.
Expand Down
1 change: 1 addition & 0 deletions core/network/host/discovery.go
Expand Up @@ -59,6 +59,7 @@ func (d *BertyDiscovery) Advertise(ctx context.Context, ns string, opts ...disco
for i := range d.discoveries {
<-waitChans[i]
}

time.Sleep(time.Second)
return time.Now().Sub(t), nil
}
Expand Down
23 changes: 8 additions & 15 deletions core/network/host/routing.go
Expand Up @@ -18,12 +18,6 @@ import (
"go.uber.org/zap"
)

const (
maxDelay = 5.0 * time.Second
baseDelay = 500.0 * time.Millisecond
factor = 1.2
)

var _ routing.IpfsRouting = (*BertyRouting)(nil)
var _ inet.Notifiee = (*BertyRouting)(nil)

Expand Down Expand Up @@ -132,13 +126,12 @@ func (br *BertyRouting) isReady(ctx context.Context) error {
}
}

func (br *BertyRouting) getBackoffDelay(currentDelay time.Duration) time.Duration {
delay := float64(currentDelay) * factor
if delay > float64(maxDelay) {
return maxDelay
func (br *BertyRouting) IsReady(ctx context.Context) bool {
if err := br.isReady(ctx); err != nil {
return false
}

return time.Duration(delay)
return true
}

func (br *BertyRouting) Listen(net inet.Network, a ma.Multiaddr) {}
Expand All @@ -158,13 +151,13 @@ func (br *BertyRouting) Connected(net inet.Network, c inet.Conn) {

ctx := context.Background()
if !br.ready {
delay := baseDelay
delay := NewBackoffDelay(500*time.Millisecond, 5*time.Second)
retries := 0
for !br.routing.IsReady(ctx) {
delay = br.getBackoffDelay(delay)

time.Sleep(delay.Backoff(retries))
// try again until bucket is fill with at last
// one peer
<-time.After(delay)
retries++
}

t := time.Since(br.tstart)
Expand Down
16 changes: 16 additions & 0 deletions core/network/metric/peer.validate.gen.go~merged
@@ -0,0 +1,16 @@
<<<<<<< HEAD
// this file was generated by protoc-gen-gotemplate

package metric
||||||| merged common ancestors
=======
// this file was generated by protoc-gen-gotemplate

<<<<<<< HEAD:core/entity/kind.validate.gen.go
package entity
||||||| merged common ancestors:core/network/p2p/protocol/provider/pubsub/pubsub.validate.gen.go
package pubsub
=======
package metric
>>>>>>> refactor(network): new archi with libp2p:core/network/metric/peer.validate.gen.go
>>>>>>> refactor(network): new archi with libp2p
16 changes: 16 additions & 0 deletions core/network/metric/peer.validate.gen.go~merged_0
@@ -0,0 +1,16 @@
<<<<<<< HEAD
// this file was generated by protoc-gen-gotemplate

package metric
||||||| merged common ancestors
=======
// this file was generated by protoc-gen-gotemplate

<<<<<<< HEAD:core/entity/kind.validate.gen.go
package entity
||||||| merged common ancestors:core/network/p2p/protocol/provider/pubsub/pubsub.validate.gen.go
package pubsub
=======
package metric
>>>>>>> feat(p2p): Custom host with the capability to choose a specific conn on NewStream:core/network/metric/peer.validate.gen.go
>>>>>>> feat(p2p): Custom host with the capability to choose a specific conn on NewStream
16 changes: 16 additions & 0 deletions core/network/metric/peer.validate.gen.go~merged_1
@@ -0,0 +1,16 @@
<<<<<<< HEAD
// this file was generated by protoc-gen-gotemplate

package metric
||||||| merged common ancestors
=======
// this file was generated by protoc-gen-gotemplate

<<<<<<< HEAD:core/entity/kind.validate.gen.go
package entity
||||||| merged common ancestors:core/network/p2p/protocol/provider/pubsub/pubsub.validate.gen.go
package pubsub
=======
package metric
>>>>>>> refactor(network): new archi with libp2p:core/network/metric/peer.validate.gen.go
>>>>>>> refactor(network): new archi with libp2p

0 comments on commit 63820f2

Please sign in to comment.