From 63820f24e6b3154983f35c5e79a2af23b9cd0f27 Mon Sep 17 00:00:00 2001 From: Guilhem Fanton Date: Mon, 18 Mar 2019 17:15:08 +0100 Subject: [PATCH] feat(network): Add backoff delay logic --- client/react-native/desktop/main.go | 2 +- core/api/client/jsonclient/logger.gen.go | 1 + core/api/client/logger.gen.go | 1 + core/cmd/berty/root.go | 2 +- core/network/driver.go | 2 +- core/network/host/backoff.go | 53 ++++++ core/network/host/connmanager.go | 151 +++++++----------- core/network/host/discovery.go | 1 + core/network/host/routing.go | 23 +-- .../metric/peer.validate.gen.go~merged | 16 ++ .../metric/peer.validate.gen.go~merged_0 | 16 ++ .../metric/peer.validate.gen.go~merged_1 | 16 ++ core/network/network.go | 9 +- core/node/mainloop.go | 2 +- 14 files changed, 177 insertions(+), 118 deletions(-) create mode 100644 core/network/host/backoff.go create mode 100644 core/network/metric/peer.validate.gen.go~merged create mode 100644 core/network/metric/peer.validate.gen.go~merged_0 create mode 100644 core/network/metric/peer.validate.gen.go~merged_1 diff --git a/client/react-native/desktop/main.go b/client/react-native/desktop/main.go index 0dca0a3bd9..bc1cf84935 100644 --- a/client/react-native/desktop/main.go +++ b/client/react-native/desktop/main.go @@ -9,7 +9,7 @@ import ( "berty.tech/core/pkg/logmanager" "go.uber.org/zap" - "github.com/asticode/go-astilectron" + astilectron "github.com/asticode/go-astilectron" bootstrap "github.com/asticode/go-astilectron-bootstrap" ) diff --git a/core/api/client/jsonclient/logger.gen.go b/core/api/client/jsonclient/logger.gen.go index fa5cd7d5a0..5a498381b8 100644 --- a/core/api/client/jsonclient/logger.gen.go +++ b/core/api/client/jsonclient/logger.gen.go @@ -1,4 +1,5 @@ // Code generated by berty.tech/core/.scripts/generate-logger.sh + package jsonclient import "go.uber.org/zap" diff --git a/core/api/client/logger.gen.go b/core/api/client/logger.gen.go index 4ad92854b5..57da3d740d 100644 --- a/core/api/client/logger.gen.go +++ b/core/api/client/logger.gen.go @@ -1,4 +1,5 @@ // Code generated by berty.tech/core/.scripts/generate-logger.sh + package client import "go.uber.org/zap" diff --git a/core/cmd/berty/root.go b/core/cmd/berty/root.go index 0a418a849b..8b61e345cd 100644 --- a/core/cmd/berty/root.go +++ b/core/cmd/berty/root.go @@ -56,7 +56,7 @@ func newRootCommand() *cobra.Command { defaultJaegerName := os.Getenv("USER") + "@" + os.Getenv("HOST") cmd.PersistentFlags().BoolP("help", "h", false, "Print usage") cmd.PersistentFlags().StringP("log-level", "", "info", "log level (debug, info, warn, error)") - cmd.PersistentFlags().StringP("log-namespaces", "", "core.*,vendor.gorm*", "logger namespaces to enable (supports wildcard)") + cmd.PersistentFlags().StringP("log-namespaces", "", "core.*", "logger namespaces to enable (supports wildcard)") cmd.PersistentFlags().StringP("log-dir", "", "/tmp/berty-logs", "local log files directory") cmd.PersistentFlags().StringP("jaeger-address", "", "127.0.0.1:6831", "ip address / hostname and port of jaeger-agent: :") cmd.PersistentFlags().StringP("jaeger-name", "", defaultJaegerName, "tracer name") diff --git a/core/network/driver.go b/core/network/driver.go index 0ba181c183..6e2a2ec91c 100644 --- a/core/network/driver.go +++ b/core/network/driver.go @@ -254,7 +254,6 @@ func (net *Network) SendTo(ctx context.Context, pi pstore.PeerInfo, e *entity.En } func (net *Network) handleEnvelope(s inet.Stream) { - logger().Debug("receiving envelope") if net.handler == nil { logger().Error("handler is not set") return @@ -274,6 +273,7 @@ func (net *Network) handleEnvelope(s inet.Stream) { return } + logger().Debug("receiving envelope") // @TODO: get opentracing context net.handler(context.Background(), e) } diff --git a/core/network/host/backoff.go b/core/network/host/backoff.go new file mode 100644 index 0000000000..bba0ead581 --- /dev/null +++ b/core/network/host/backoff.go @@ -0,0 +1,53 @@ +package host + +import ( + "math/rand" + "time" +) + +const ( + Factor = 1.6 + + // Randomize backoff delays so that if requests start at + // the same time, they won't operate in lockstep. + Jitter = 0.2 +) + +var rsource rand.Source + +func init() { + rsource = rand.NewSource(time.Now().UnixNano()) +} + +type BackoffDelay struct { + rand *rand.Rand + baseDelay time.Duration + maxDelay time.Duration +} + +func NewBackoffDelay(baseDelay, maxDelay time.Duration) *BackoffDelay { + return &BackoffDelay{ + rand: rand.New(rsource), + baseDelay: baseDelay, + maxDelay: maxDelay, + } +} + +func (b *BackoffDelay) Backoff(retries int) time.Duration { + if retries == 0 { + return b.baseDelay + } + + backoff, max := float64(b.baseDelay), float64(b.maxDelay) + for backoff < max && retries > 0 { + backoff *= Factor + retries-- + } + + backoff *= 1 + Jitter*(b.rand.Float64()*2-1) + if backoff < 0 { + return 0 + } + + return time.Duration(backoff) +} diff --git a/core/network/host/connmanager.go b/core/network/host/connmanager.go index 53d02bef34..9713617a34 100644 --- a/core/network/host/connmanager.go +++ b/core/network/host/connmanager.go @@ -30,6 +30,33 @@ func NewBertyConnMgr(ctx context.Context, low, hi int, grace time.Duration) *Ber return cm } +func (cm *BertyConnMgr) reconnect(net inet.Network, pid peer.ID, delay *BackoffDelay) { + retries := 0 + for { + select { + case <-time.After(delay.Backoff(retries)): + case <-cm.ctx.Done(): + logger().Error("connmanager", zap.Error(cm.ctx.Err())) + return + } + + if net.Connectedness(pid) == inet.Connected { + return + } + + logger().Debug("connmanager: try to reconnect", zap.String("id", pid.Pretty()), zap.Int("retries", retries)) + _, err := net.DialPeer(cm.ctx, pid) + if err == nil { + return + } + + logger().Debug("connmanager: cannot reconnect", zap.String("id", pid.Pretty()), zap.Error(err)) + + // update retries + retries++ + } +} + func (cm *BertyConnMgr) TrimOpenConns(ctx context.Context) { cm.BasicConnMgr.TrimOpenConns(ctx) } @@ -50,6 +77,19 @@ func (cm *BertyConnMgr) GetInfo() connmgr.CMInfo { return cm.BasicConnMgr.GetInfo() } +func (cm *BertyConnMgr) getRelayPeers() []pstore.PeerInfo { + fmt.Printf("not implemented") + return nil +} +func (cm *BertyConnMgr) getBootstrapPeers() []pstore.PeerInfo { + fmt.Printf("not implemented") + return nil +} +func (cm *BertyConnMgr) getKbucketPeers() []pstore.PeerInfo { + fmt.Printf("not implemented") + return nil +} + func (cm *BertyConnMgr) Notifee() inet.Notifiee { return cm } @@ -60,104 +100,23 @@ func (cm *BertyConnMgr) Connected(net inet.Network, c inet.Conn) { } func (cm *BertyConnMgr) Disconnected(net inet.Network, c inet.Conn) { - // check if it a relay conn and try to reconnect - tagInfo := cm.GetTagInfo(c.RemotePeer()) - peerID := c.RemotePeer() - - // TODO: reconnect to kbucket && bootstrap && relay if list of each are < 1 - logger().Debug("Disconnected", zap.String("tagInfo", fmt.Sprintf("%+v", tagInfo.Tags))) - if v, ok := tagInfo.Tags["kbucket"]; ok && v == 5 { - go func() { - for { - logger().Debug( - "connmanager: try to reconnect to kbucket", - zap.String("id", peerID.Pretty()), - ) - if _, err := net.DialPeer(cm.ctx, peerID); err != nil { - logger().Debug( - "connmanager: cannot reconnect to kbucket", - zap.String("id", peerID.Pretty()), - zap.String("err", err.Error()), - ) - select { - case <-time.After(time.Second * 10): - continue - case <-cm.ctx.Done(): - cm.BasicConnMgr.Notifee().Disconnected(net, c) - return - } - } - break - } - }() - return - } else if v, ok := tagInfo.Tags["bootstrap"]; ok && v == 2 { - go func() { - for { - logger().Debug( - "connmanager: try to reconnect to bootstrap", - zap.String("id", peerID.Pretty()), - ) - if _, err := net.DialPeer(cm.ctx, peerID); err != nil { - logger().Debug( - "connmanager: cannot reconnect to bootstrap", - zap.String("id", peerID.Pretty()), - zap.String("err", err.Error()), - ) - select { - case <-time.After(time.Second * 1): - continue - case <-cm.ctx.Done(): - cm.BasicConnMgr.Notifee().Disconnected(net, c) - return - } - } - break - } - }() - return - } else if v, ok := tagInfo.Tags["relay-hop"]; ok && v == 2 { - go func() { - for { - logger().Debug( - "connmanager: try to reconnect to relay", - zap.String("id", peerID.Pretty()), - ) - if _, err := net.DialPeer(cm.ctx, peerID); err != nil { - logger().Debug( - "connmanager: cannot reconnect to relay", - zap.String("id", peerID.Pretty()), - zap.String("err", err.Error()), - ) - select { - case <-time.After(time.Second * 10): - continue - case <-cm.ctx.Done(): - cm.BasicConnMgr.Notifee().Disconnected(net, c) - return - } - } - break - } - }() - return - } else { - cm.BasicConnMgr.Notifee().Disconnected(net, c) + if net.Connectedness(c.RemotePeer()) != inet.Connected { + // check if it a relay conn and try to reconnect + peerID := c.RemotePeer() + tagInfo := cm.GetTagInfo(peerID) + + var delay *BackoffDelay + + if v, ok := tagInfo.Tags["bootstrap"]; ok && v == 2 { + delay = NewBackoffDelay(time.Second, time.Second*10) + go cm.reconnect(net, peerID, delay) + } else if v, ok := tagInfo.Tags["relay-hop"]; ok && v == 2 { + delay = NewBackoffDelay(time.Second, time.Minute) + go cm.reconnect(net, peerID, delay) + } } -} -// TODO -func (cm *BertyConnMgr) getRelayPeers() []pstore.PeerInfo { - fmt.Printf("not implemented") - return nil -} -func (cm *BertyConnMgr) getBootstrapPeers() []pstore.PeerInfo { - fmt.Printf("not implemented") - return nil -} -func (cm *BertyConnMgr) getKbucketPeers() []pstore.PeerInfo { - fmt.Printf("not implemented") - return nil + cm.BasicConnMgr.Notifee().Disconnected(net, c) } // Listen is no-op in this implementation. diff --git a/core/network/host/discovery.go b/core/network/host/discovery.go index ba1cca6a13..2df76c84bd 100644 --- a/core/network/host/discovery.go +++ b/core/network/host/discovery.go @@ -59,6 +59,7 @@ func (d *BertyDiscovery) Advertise(ctx context.Context, ns string, opts ...disco for i := range d.discoveries { <-waitChans[i] } + time.Sleep(time.Second) return time.Now().Sub(t), nil } diff --git a/core/network/host/routing.go b/core/network/host/routing.go index be7df7bf93..0abf8bf244 100644 --- a/core/network/host/routing.go +++ b/core/network/host/routing.go @@ -18,12 +18,6 @@ import ( "go.uber.org/zap" ) -const ( - maxDelay = 5.0 * time.Second - baseDelay = 500.0 * time.Millisecond - factor = 1.2 -) - var _ routing.IpfsRouting = (*BertyRouting)(nil) var _ inet.Notifiee = (*BertyRouting)(nil) @@ -132,13 +126,12 @@ func (br *BertyRouting) isReady(ctx context.Context) error { } } -func (br *BertyRouting) getBackoffDelay(currentDelay time.Duration) time.Duration { - delay := float64(currentDelay) * factor - if delay > float64(maxDelay) { - return maxDelay +func (br *BertyRouting) IsReady(ctx context.Context) bool { + if err := br.isReady(ctx); err != nil { + return false } - return time.Duration(delay) + return true } func (br *BertyRouting) Listen(net inet.Network, a ma.Multiaddr) {} @@ -158,13 +151,13 @@ func (br *BertyRouting) Connected(net inet.Network, c inet.Conn) { ctx := context.Background() if !br.ready { - delay := baseDelay + delay := NewBackoffDelay(500*time.Millisecond, 5*time.Second) + retries := 0 for !br.routing.IsReady(ctx) { - delay = br.getBackoffDelay(delay) - + time.Sleep(delay.Backoff(retries)) // try again until bucket is fill with at last // one peer - <-time.After(delay) + retries++ } t := time.Since(br.tstart) diff --git a/core/network/metric/peer.validate.gen.go~merged b/core/network/metric/peer.validate.gen.go~merged new file mode 100644 index 0000000000..509a77749e --- /dev/null +++ b/core/network/metric/peer.validate.gen.go~merged @@ -0,0 +1,16 @@ +<<<<<<< HEAD +// this file was generated by protoc-gen-gotemplate + +package metric +||||||| merged common ancestors +======= +// this file was generated by protoc-gen-gotemplate + +<<<<<<< HEAD:core/entity/kind.validate.gen.go +package entity +||||||| merged common ancestors:core/network/p2p/protocol/provider/pubsub/pubsub.validate.gen.go +package pubsub +======= +package metric +>>>>>>> refactor(network): new archi with libp2p:core/network/metric/peer.validate.gen.go +>>>>>>> refactor(network): new archi with libp2p diff --git a/core/network/metric/peer.validate.gen.go~merged_0 b/core/network/metric/peer.validate.gen.go~merged_0 new file mode 100644 index 0000000000..797f3451b4 --- /dev/null +++ b/core/network/metric/peer.validate.gen.go~merged_0 @@ -0,0 +1,16 @@ +<<<<<<< HEAD +// this file was generated by protoc-gen-gotemplate + +package metric +||||||| merged common ancestors +======= +// this file was generated by protoc-gen-gotemplate + +<<<<<<< HEAD:core/entity/kind.validate.gen.go +package entity +||||||| merged common ancestors:core/network/p2p/protocol/provider/pubsub/pubsub.validate.gen.go +package pubsub +======= +package metric +>>>>>>> feat(p2p): Custom host with the capability to choose a specific conn on NewStream:core/network/metric/peer.validate.gen.go +>>>>>>> feat(p2p): Custom host with the capability to choose a specific conn on NewStream diff --git a/core/network/metric/peer.validate.gen.go~merged_1 b/core/network/metric/peer.validate.gen.go~merged_1 new file mode 100644 index 0000000000..509a77749e --- /dev/null +++ b/core/network/metric/peer.validate.gen.go~merged_1 @@ -0,0 +1,16 @@ +<<<<<<< HEAD +// this file was generated by protoc-gen-gotemplate + +package metric +||||||| merged common ancestors +======= +// this file was generated by protoc-gen-gotemplate + +<<<<<<< HEAD:core/entity/kind.validate.gen.go +package entity +||||||| merged common ancestors:core/network/p2p/protocol/provider/pubsub/pubsub.validate.gen.go +package pubsub +======= +package metric +>>>>>>> refactor(network): new archi with libp2p:core/network/metric/peer.validate.gen.go +>>>>>>> refactor(network): new archi with libp2p diff --git a/core/network/network.go b/core/network/network.go index 837327b753..04c5ecbb2d 100644 --- a/core/network/network.go +++ b/core/network/network.go @@ -11,6 +11,10 @@ import ( "go.uber.org/zap" ) +const ( + MaxStreamUse = 20 +) + type Network struct { config *config.Config @@ -18,6 +22,8 @@ type Network struct { handler func(context.Context, *entity.Envelope) (*entity.Void, error) + streamManager *StreamManager + updating *sync.Mutex shutdown context.CancelFunc @@ -69,9 +75,6 @@ func (net *Network) init(ctx context.Context) { net.host.SetStreamHandler(ProtocolID, net.handleEnvelope) net.logHostInfos() - // advertise and find peers on berty discovery service - net.Discover(ctx) - // bootstrap default peers // TOOD: infinite bootstrap + don't permit routing to provide when no peers are discovered if err := net.Bootstrap(ctx, false, net.config.Bootstrap...); err != nil { diff --git a/core/node/mainloop.go b/core/node/mainloop.go index 7ad933d948..30b6ac0a61 100644 --- a/core/node/mainloop.go +++ b/core/node/mainloop.go @@ -151,7 +151,7 @@ func (n *Node) handleOutgoingEvent(ctx context.Context, event *entity.Event) { // if too long, the task will be done in background done := make(chan bool, 1) go func() { - tctx, cancel := context.WithTimeout(ctx, time.Second*10) + tctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() envCopy := envelope