diff --git a/client/react-native/desktop/go.mod b/client/react-native/desktop/go.mod index b7c17c9e92..e15d046f90 100644 --- a/client/react-native/desktop/go.mod +++ b/client/react-native/desktop/go.mod @@ -15,7 +15,6 @@ require ( github.com/pkg/errors v0.8.1 github.com/sam-kamerer/go-plister v0.0.0-20190202124357-57f251aa88ff // indirect github.com/shibukawa/configdir v0.0.0-20170330084843-e180dbdc8da0 - github.com/spf13/afero v1.2.0 // indirect go.uber.org/zap v1.9.1 ) diff --git a/client/react-native/gomobile/go.mod b/client/react-native/gomobile/go.mod index a5c765af1b..6ea0866f3f 100644 --- a/client/react-native/gomobile/go.mod +++ b/client/react-native/gomobile/go.mod @@ -4,6 +4,7 @@ require ( berty.tech/core v0.0.0 github.com/libp2p/go-buffer-pool v0.1.2-0.20181009094743-058210c5a0d0 // indirect github.com/pkg/errors v0.8.1 + github.com/spf13/afero v1.2.0 // indirect go.uber.org/zap v1.9.1 ) diff --git a/core/api/client/jsonclient/logger.gen.go b/core/api/client/jsonclient/logger.gen.go index 5a498381b8..fa5cd7d5a0 100644 --- a/core/api/client/jsonclient/logger.gen.go +++ b/core/api/client/jsonclient/logger.gen.go @@ -1,5 +1,4 @@ // Code generated by berty.tech/core/.scripts/generate-logger.sh - package jsonclient import "go.uber.org/zap" diff --git a/core/api/client/logger.gen.go b/core/api/client/logger.gen.go index 57da3d740d..4ad92854b5 100644 --- a/core/api/client/logger.gen.go +++ b/core/api/client/logger.gen.go @@ -1,5 +1,4 @@ // Code generated by berty.tech/core/.scripts/generate-logger.sh - package client import "go.uber.org/zap" diff --git a/core/cmd/berty/daemon.go b/core/cmd/berty/daemon.go index 94c1f60e2c..a3e8d30b7e 100644 --- a/core/cmd/berty/daemon.go +++ b/core/cmd/berty/daemon.go @@ -36,6 +36,7 @@ type daemonOptions struct { // p2p + peerCache bool `mapstructure:"cache-peer"` identity string `mapstructure:"identity"` bootstrap []string `mapstructure:"bootstrap"` noP2P bool `mapstructure:"no-p2p"` @@ -74,6 +75,7 @@ func daemonSetupFlags(flags *pflag.FlagSet, opts *daemonOptions) { flags.BoolVar(&opts.dhtServer, "dht-server", false, "enable dht server") flags.BoolVar(&opts.ble, "ble", false, "enable ble transport") flags.BoolVar(&opts.PrivateNetwork, "private-network", true, "enable private network with the default swarm key") + flags.BoolVar(&opts.peerCache, "cache-peer", true, "if false, network will ask the dht every time he need to send an envelope (emit)") flags.StringSliceVar(&opts.bindP2P, "bind-p2p", []string{}, "p2p listening address") flags.StringVar(&opts.SwarmKeyPath, "swarm-key", "", "path to a custom swarm key, only peers that use the same swarm key will be able to talk with you") // flags.StringSliceVar(&opts.bindP2P, "bind-p2p", []string{"/ip4/0.0.0.0/tcp/0"}, "p2p listening address") @@ -167,6 +169,7 @@ func daemon(opts *daemonOptions) error { Identity: opts.identity, Persist: false, OverridePersist: false, + PeerCache: opts.peerCache, // DHTKVLogDatastore: opts.dhtkvLogDatastore, }), diff --git a/core/network/cache.go b/core/network/cache.go new file mode 100644 index 0000000000..0c70ce988a --- /dev/null +++ b/core/network/cache.go @@ -0,0 +1,139 @@ +package network + +import ( + "sync" + "time" + + host "github.com/libp2p/go-libp2p-host" + inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + pstore "github.com/libp2p/go-libp2p-peerstore" + ma "github.com/multiformats/go-multiaddr" +) + +type PeerCache interface { + GetPeerForKey(string) (pstore.PeerInfo, bool) + UpdateCache(string, pstore.PeerInfo) +} + +type cpeer struct { + key string + lastUpdate time.Time + connCount int + info pstore.PeerInfo +} + +type peerCache struct { + peers map[peer.ID]*cpeer + store map[string]*cpeer + muStore sync.RWMutex +} + +type noopCache struct{} + +func (nc *noopCache) GetPeerForKey(_ string) (_ pstore.PeerInfo, _ bool) { return } +func (nc *noopCache) UpdateCache(_ string, _ pstore.PeerInfo) {} + +func NewNoopCache() PeerCache { + return &noopCache{} +} + +func NewPeerCache(h host.Host) PeerCache { + pc := &peerCache{ + peers: make(map[peer.ID]*cpeer), + store: make(map[string]*cpeer), + } + + h.Network().Notify(pc) + return pc +} + +func (pc *peerCache) GetPeerForKey(key string) (pinfo pstore.PeerInfo, ok bool) { + pc.muStore.RLock() + defer pc.muStore.RUnlock() + + var p *cpeer + if p, ok = pc.store[key]; ok { + pinfo = p.info + } + + return +} + +func (pc *peerCache) UpdateCache(key string, pi pstore.PeerInfo) { + pc.muStore.Lock() + + p, ok := pc.peers[pi.ID] + if !ok { + p = &cpeer{ + key: key, + info: pi, + connCount: 0, + lastUpdate: time.Now(), + } + + pc.peers[pi.ID] = p + } + + p.key = key + p.info = pi + pc.store[key] = p + + pc.muStore.Unlock() +} + +func (pc *peerCache) Connected(net inet.Network, c inet.Conn) { + pc.muStore.Lock() + + peerID := c.RemotePeer() + p, ok := pc.peers[peerID] + if !ok { + p = &cpeer{ + connCount: 0, + lastUpdate: time.Now(), + } + + pc.peers[peerID] = p + } + + p.connCount++ + + pc.muStore.Unlock() +} + +func (pc *peerCache) Disconnected(net inet.Network, c inet.Conn) { + pc.muStore.Lock() + + peerID := c.RemotePeer() + if p, ok := pc.peers[peerID]; ok { + p.connCount-- + + if p.connCount == 0 { + if p.key != "" { + delete(pc.store, p.key) + } + + delete(pc.peers, peerID) + } + } + + pc.muStore.Unlock() +} + +// Listen is no-op in this implementation. +func (pc *peerCache) Listen(n inet.Network, addr ma.Multiaddr) { +} + +// ListenClose is no-op in this implementation. +func (pc *peerCache) ListenClose(n inet.Network, addr ma.Multiaddr) { + +} + +// OpenedStream is no-op in this implementation. +func (pc *peerCache) OpenedStream(n inet.Network, s inet.Stream) { + +} + +// ClosedStream is no-op in this implementation. +func (pc *peerCache) ClosedStream(n inet.Network, s inet.Stream) { +} diff --git a/core/network/config/config.go b/core/network/config/config.go index 32a19bbe0c..560ba22189 100644 --- a/core/network/config/config.go +++ b/core/network/config/config.go @@ -80,6 +80,8 @@ type Config struct { DefaultBootstrap bool Bootstrap []string + PeerCache bool + Ping bool Metric bool @@ -112,6 +114,7 @@ func (cfg *Config) Override(override *Config) error { cfg.HOP = override.HOP cfg.Identity = override.Identity cfg.SwarmKey = override.SwarmKey + cfg.PeerCache = override.PeerCache return nil } diff --git a/core/network/driver.go b/core/network/driver.go index 6e2a2ec91c..7e7818a124 100644 --- a/core/network/driver.go +++ b/core/network/driver.go @@ -212,24 +212,31 @@ func (net *Network) Emit(ctx context.Context, e *entity.Envelope) error { return net.EmitTo(ctx, e.GetChannelID(), e) } -func (net *Network) EmitTo(ctx context.Context, contactID string, e *entity.Envelope) error { +func (net *Network) EmitTo(ctx context.Context, contactID string, e *entity.Envelope) (err error) { tracer := tracing.EnterFunc(ctx, contactID, e) defer tracer.Finish() ctx = tracer.Context() - peerInfo, err := routing_validator.ContactIDToPeerInfo(ctx, net.host.Routing, contactID) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("EmitTo failed during contactID translation (%s)", contactID)) + pinfo, ok := net.cache.GetPeerForKey(contactID) + if !ok { + pinfo, err = routing_validator.ContactIDToPeerInfo(ctx, net.host.Routing, contactID) + if err != nil { + err = errors.Wrap(err, fmt.Sprintf("EmitTo failed during contactID translation (%s)", contactID)) + return + } + + net.cache.UpdateCache(contactID, pinfo) + // @TODO: we need to split this, and let the node do the logic to try + // back if the send fail with the given peer + } - // @TODO: we need to split this, and let the node do the logic to try - // back if the send fail with the given peer - if err = net.SendTo(ctx, peerInfo, e); err != nil { + if err = net.SendTo(ctx, pinfo, e); err != nil { logger().Warn("sendTo", zap.Error(err)) - return err + return } - return nil + return } func (net *Network) SendTo(ctx context.Context, pi pstore.PeerInfo, e *entity.Envelope) error { diff --git a/core/network/host/connmanager.go b/core/network/host/connmanager.go index 9713617a34..eb74ebbaec 100644 --- a/core/network/host/connmanager.go +++ b/core/network/host/connmanager.go @@ -40,10 +40,6 @@ func (cm *BertyConnMgr) reconnect(net inet.Network, pid peer.ID, delay *BackoffD return } - if net.Connectedness(pid) == inet.Connected { - return - } - logger().Debug("connmanager: try to reconnect", zap.String("id", pid.Pretty()), zap.Int("retries", retries)) _, err := net.DialPeer(cm.ctx, pid) if err == nil { diff --git a/core/network/metric/peer.validate.gen.go~merged b/core/network/metric/peer.validate.gen.go~merged deleted file mode 100644 index 509a77749e..0000000000 --- a/core/network/metric/peer.validate.gen.go~merged +++ /dev/null @@ -1,16 +0,0 @@ -<<<<<<< HEAD -// this file was generated by protoc-gen-gotemplate - -package metric -||||||| merged common ancestors -======= -// this file was generated by protoc-gen-gotemplate - -<<<<<<< HEAD:core/entity/kind.validate.gen.go -package entity -||||||| merged common ancestors:core/network/p2p/protocol/provider/pubsub/pubsub.validate.gen.go -package pubsub -======= -package metric ->>>>>>> refactor(network): new archi with libp2p:core/network/metric/peer.validate.gen.go ->>>>>>> refactor(network): new archi with libp2p diff --git a/core/network/metric/peer.validate.gen.go~merged_0 b/core/network/metric/peer.validate.gen.go~merged_0 deleted file mode 100644 index 797f3451b4..0000000000 --- a/core/network/metric/peer.validate.gen.go~merged_0 +++ /dev/null @@ -1,16 +0,0 @@ -<<<<<<< HEAD -// this file was generated by protoc-gen-gotemplate - -package metric -||||||| merged common ancestors -======= -// this file was generated by protoc-gen-gotemplate - -<<<<<<< HEAD:core/entity/kind.validate.gen.go -package entity -||||||| merged common ancestors:core/network/p2p/protocol/provider/pubsub/pubsub.validate.gen.go -package pubsub -======= -package metric ->>>>>>> feat(p2p): Custom host with the capability to choose a specific conn on NewStream:core/network/metric/peer.validate.gen.go ->>>>>>> feat(p2p): Custom host with the capability to choose a specific conn on NewStream diff --git a/core/network/metric/peer.validate.gen.go~merged_1 b/core/network/metric/peer.validate.gen.go~merged_1 deleted file mode 100644 index 509a77749e..0000000000 --- a/core/network/metric/peer.validate.gen.go~merged_1 +++ /dev/null @@ -1,16 +0,0 @@ -<<<<<<< HEAD -// this file was generated by protoc-gen-gotemplate - -package metric -||||||| merged common ancestors -======= -// this file was generated by protoc-gen-gotemplate - -<<<<<<< HEAD:core/entity/kind.validate.gen.go -package entity -||||||| merged common ancestors:core/network/p2p/protocol/provider/pubsub/pubsub.validate.gen.go -package pubsub -======= -package metric ->>>>>>> refactor(network): new archi with libp2p:core/network/metric/peer.validate.gen.go ->>>>>>> refactor(network): new archi with libp2p diff --git a/core/network/network.go b/core/network/network.go index 04c5ecbb2d..1177bcacda 100644 --- a/core/network/network.go +++ b/core/network/network.go @@ -11,10 +11,6 @@ import ( "go.uber.org/zap" ) -const ( - MaxStreamUse = 20 -) - type Network struct { config *config.Config @@ -22,11 +18,11 @@ type Network struct { handler func(context.Context, *entity.Envelope) (*entity.Void, error) - streamManager *StreamManager - updating *sync.Mutex shutdown context.CancelFunc + + cache PeerCache } // Chainconfig.Options chains multiple options into a single option. @@ -53,6 +49,7 @@ func New(ctx context.Context, opts ...config.Option) (*Network, error) { config: &config.Config{}, updating: &sync.Mutex{}, shutdown: cancel, + cache: NewNoopCache(), } if err := net.config.Apply(ctx, opts...); err != nil { @@ -66,6 +63,10 @@ func New(ctx context.Context, opts ...config.Option) (*Network, error) { return nil, err } + if net.Config().PeerCache { + net.cache = NewPeerCache(net.host) + } + net.init(ctx) return net, nil @@ -96,6 +97,7 @@ func (net *Network) Update(ctx context.Context, opts ...config.Option) error { updating: net.updating, handler: net.handler, shutdown: cancel, + cache: NewNoopCache(), } if err := update.config.Apply(ctx, append([]config.Option{WithConfig(net.config)}, opts...)...); err != nil { @@ -109,6 +111,10 @@ func (net *Network) Update(ctx context.Context, opts ...config.Option) error { return err } + if update.Config().PeerCache { + net.cache = NewPeerCache(update.host) + } + net.Close(ctx) *net = *update diff --git a/core/network/options.go b/core/network/options.go index 022db32dca..68c776800e 100644 --- a/core/network/options.go +++ b/core/network/options.go @@ -24,12 +24,14 @@ func WithClientOptions() config.Option { EnableMetric(), EnablePing(), EnableTCP(), + EnablePeerCache(), ) } func WithServerOptions() config.Option { return ChainOptions( DisablePersistConfig(), + DisablePeerCache(), EnableDHTServer(), EnableDefaultBind(), EnableHOP(), @@ -315,3 +317,17 @@ func DisableQUIC() config.Option { return nil } } + +func EnablePeerCache() config.Option { + return func(cfg *config.Config) error { + cfg.PeerCache = true + return nil + } +} + +func DisablePeerCache() config.Option { + return func(cfg *config.Config) error { + cfg.PeerCache = false + return nil + } +} diff --git a/core/node/mainloop.go b/core/node/mainloop.go index 30b6ac0a61..7ad933d948 100644 --- a/core/node/mainloop.go +++ b/core/node/mainloop.go @@ -151,7 +151,7 @@ func (n *Node) handleOutgoingEvent(ctx context.Context, event *entity.Event) { // if too long, the task will be done in background done := make(chan bool, 1) go func() { - tctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + tctx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() envCopy := envelope