Skip to content

Commit

Permalink
feat(network): Cache peer
Browse files Browse the repository at this point in the history
  • Loading branch information
gfanton committed Mar 20, 2019
1 parent 63820f2 commit 908771d
Show file tree
Hide file tree
Showing 15 changed files with 191 additions and 71 deletions.
1 change: 0 additions & 1 deletion client/react-native/desktop/go.mod
Expand Up @@ -15,7 +15,6 @@ require (
github.com/pkg/errors v0.8.1
github.com/sam-kamerer/go-plister v0.0.0-20190202124357-57f251aa88ff // indirect
github.com/shibukawa/configdir v0.0.0-20170330084843-e180dbdc8da0
github.com/spf13/afero v1.2.0 // indirect
go.uber.org/zap v1.9.1
)

Expand Down
1 change: 1 addition & 0 deletions client/react-native/gomobile/go.mod
Expand Up @@ -4,6 +4,7 @@ require (
berty.tech/core v0.0.0
github.com/libp2p/go-buffer-pool v0.1.2-0.20181009094743-058210c5a0d0 // indirect
github.com/pkg/errors v0.8.1
github.com/spf13/afero v1.2.0 // indirect
go.uber.org/zap v1.9.1
)

Expand Down
1 change: 0 additions & 1 deletion core/api/client/jsonclient/logger.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion core/api/client/logger.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions core/cmd/berty/daemon.go
Expand Up @@ -36,6 +36,7 @@ type daemonOptions struct {

// p2p

peerCache bool `mapstructure:"cache-peer"`
identity string `mapstructure:"identity"`
bootstrap []string `mapstructure:"bootstrap"`
noP2P bool `mapstructure:"no-p2p"`
Expand Down Expand Up @@ -74,6 +75,7 @@ func daemonSetupFlags(flags *pflag.FlagSet, opts *daemonOptions) {
flags.BoolVar(&opts.dhtServer, "dht-server", false, "enable dht server")
flags.BoolVar(&opts.ble, "ble", false, "enable ble transport")
flags.BoolVar(&opts.PrivateNetwork, "private-network", true, "enable private network with the default swarm key")
flags.BoolVar(&opts.peerCache, "cache-peer", true, "if false, network will ask the dht every time he need to send an envelope (emit)")
flags.StringSliceVar(&opts.bindP2P, "bind-p2p", []string{}, "p2p listening address")
flags.StringVar(&opts.SwarmKeyPath, "swarm-key", "", "path to a custom swarm key, only peers that use the same swarm key will be able to talk with you")
// flags.StringSliceVar(&opts.bindP2P, "bind-p2p", []string{"/ip4/0.0.0.0/tcp/0"}, "p2p listening address")
Expand Down Expand Up @@ -167,6 +169,7 @@ func daemon(opts *daemonOptions) error {
Identity: opts.identity,
Persist: false,
OverridePersist: false,
PeerCache: opts.peerCache,

// DHTKVLogDatastore: opts.dhtkvLogDatastore,
}),
Expand Down
139 changes: 139 additions & 0 deletions core/network/cache.go
@@ -0,0 +1,139 @@
package network

import (
"sync"
"time"

host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
)

type PeerCache interface {
GetPeerForKey(string) (pstore.PeerInfo, bool)
UpdateCache(string, pstore.PeerInfo)
}

type cpeer struct {
key string
lastUpdate time.Time
connCount int
info pstore.PeerInfo
}

type peerCache struct {
peers map[peer.ID]*cpeer
store map[string]*cpeer
muStore sync.RWMutex
}

type noopCache struct{}

func (nc *noopCache) GetPeerForKey(_ string) (_ pstore.PeerInfo, _ bool) { return }
func (nc *noopCache) UpdateCache(_ string, _ pstore.PeerInfo) {}

func NewNoopCache() PeerCache {
return &noopCache{}
}

func NewPeerCache(h host.Host) PeerCache {
pc := &peerCache{
peers: make(map[peer.ID]*cpeer),
store: make(map[string]*cpeer),
}

h.Network().Notify(pc)
return pc
}

func (pc *peerCache) GetPeerForKey(key string) (pinfo pstore.PeerInfo, ok bool) {
pc.muStore.RLock()
defer pc.muStore.RUnlock()

var p *cpeer
if p, ok = pc.store[key]; ok {
pinfo = p.info
}

return
}

func (pc *peerCache) UpdateCache(key string, pi pstore.PeerInfo) {
pc.muStore.Lock()

p, ok := pc.peers[pi.ID]
if !ok {
p = &cpeer{
key: key,
info: pi,
connCount: 0,
lastUpdate: time.Now(),
}

pc.peers[pi.ID] = p
}

p.key = key
p.info = pi
pc.store[key] = p

pc.muStore.Unlock()
}

func (pc *peerCache) Connected(net inet.Network, c inet.Conn) {
pc.muStore.Lock()

peerID := c.RemotePeer()
p, ok := pc.peers[peerID]
if !ok {
p = &cpeer{
connCount: 0,
lastUpdate: time.Now(),
}

pc.peers[peerID] = p
}

p.connCount++

pc.muStore.Unlock()
}

func (pc *peerCache) Disconnected(net inet.Network, c inet.Conn) {
pc.muStore.Lock()

peerID := c.RemotePeer()
if p, ok := pc.peers[peerID]; ok {
p.connCount--

if p.connCount == 0 {
if p.key != "" {
delete(pc.store, p.key)
}

delete(pc.peers, peerID)
}
}

pc.muStore.Unlock()
}

// Listen is no-op in this implementation.
func (pc *peerCache) Listen(n inet.Network, addr ma.Multiaddr) {
}

// ListenClose is no-op in this implementation.
func (pc *peerCache) ListenClose(n inet.Network, addr ma.Multiaddr) {

}

// OpenedStream is no-op in this implementation.
func (pc *peerCache) OpenedStream(n inet.Network, s inet.Stream) {

}

// ClosedStream is no-op in this implementation.
func (pc *peerCache) ClosedStream(n inet.Network, s inet.Stream) {
}
3 changes: 3 additions & 0 deletions core/network/config/config.go
Expand Up @@ -80,6 +80,8 @@ type Config struct {
DefaultBootstrap bool
Bootstrap []string

PeerCache bool

Ping bool

Metric bool
Expand Down Expand Up @@ -112,6 +114,7 @@ func (cfg *Config) Override(override *Config) error {
cfg.HOP = override.HOP
cfg.Identity = override.Identity
cfg.SwarmKey = override.SwarmKey
cfg.PeerCache = override.PeerCache
return nil
}

Expand Down
25 changes: 16 additions & 9 deletions core/network/driver.go
Expand Up @@ -212,24 +212,31 @@ func (net *Network) Emit(ctx context.Context, e *entity.Envelope) error {
return net.EmitTo(ctx, e.GetChannelID(), e)
}

func (net *Network) EmitTo(ctx context.Context, contactID string, e *entity.Envelope) error {
func (net *Network) EmitTo(ctx context.Context, contactID string, e *entity.Envelope) (err error) {
tracer := tracing.EnterFunc(ctx, contactID, e)
defer tracer.Finish()
ctx = tracer.Context()

peerInfo, err := routing_validator.ContactIDToPeerInfo(ctx, net.host.Routing, contactID)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("EmitTo failed during contactID translation (%s)", contactID))
pinfo, ok := net.cache.GetPeerForKey(contactID)
if !ok {
pinfo, err = routing_validator.ContactIDToPeerInfo(ctx, net.host.Routing, contactID)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("EmitTo failed during contactID translation (%s)", contactID))
return
}

net.cache.UpdateCache(contactID, pinfo)
// @TODO: we need to split this, and let the node do the logic to try
// back if the send fail with the given peer

}
// @TODO: we need to split this, and let the node do the logic to try
// back if the send fail with the given peer

if err = net.SendTo(ctx, peerInfo, e); err != nil {
if err = net.SendTo(ctx, pinfo, e); err != nil {
logger().Warn("sendTo", zap.Error(err))
return err
return
}

return nil
return
}

func (net *Network) SendTo(ctx context.Context, pi pstore.PeerInfo, e *entity.Envelope) error {
Expand Down
4 changes: 0 additions & 4 deletions core/network/host/connmanager.go
Expand Up @@ -40,10 +40,6 @@ func (cm *BertyConnMgr) reconnect(net inet.Network, pid peer.ID, delay *BackoffD
return
}

if net.Connectedness(pid) == inet.Connected {
return
}

logger().Debug("connmanager: try to reconnect", zap.String("id", pid.Pretty()), zap.Int("retries", retries))
_, err := net.DialPeer(cm.ctx, pid)
if err == nil {
Expand Down
16 changes: 0 additions & 16 deletions core/network/metric/peer.validate.gen.go~merged

This file was deleted.

16 changes: 0 additions & 16 deletions core/network/metric/peer.validate.gen.go~merged_0

This file was deleted.

16 changes: 0 additions & 16 deletions core/network/metric/peer.validate.gen.go~merged_1

This file was deleted.

18 changes: 12 additions & 6 deletions core/network/network.go
Expand Up @@ -11,22 +11,18 @@ import (
"go.uber.org/zap"
)

const (
MaxStreamUse = 20
)

type Network struct {
config *config.Config

host *host.BertyHost

handler func(context.Context, *entity.Envelope) (*entity.Void, error)

streamManager *StreamManager

updating *sync.Mutex

shutdown context.CancelFunc

cache PeerCache
}

// Chainconfig.Options chains multiple options into a single option.
Expand All @@ -53,6 +49,7 @@ func New(ctx context.Context, opts ...config.Option) (*Network, error) {
config: &config.Config{},
updating: &sync.Mutex{},
shutdown: cancel,
cache: NewNoopCache(),
}

if err := net.config.Apply(ctx, opts...); err != nil {
Expand All @@ -66,6 +63,10 @@ func New(ctx context.Context, opts ...config.Option) (*Network, error) {
return nil, err
}

if net.Config().PeerCache {
net.cache = NewPeerCache(net.host)
}

net.init(ctx)

return net, nil
Expand Down Expand Up @@ -96,6 +97,7 @@ func (net *Network) Update(ctx context.Context, opts ...config.Option) error {
updating: net.updating,
handler: net.handler,
shutdown: cancel,
cache: NewNoopCache(),
}

if err := update.config.Apply(ctx, append([]config.Option{WithConfig(net.config)}, opts...)...); err != nil {
Expand All @@ -109,6 +111,10 @@ func (net *Network) Update(ctx context.Context, opts ...config.Option) error {
return err
}

if update.Config().PeerCache {
net.cache = NewPeerCache(update.host)
}

net.Close(ctx)

*net = *update
Expand Down

0 comments on commit 908771d

Please sign in to comment.