Skip to content

Commit

Permalink
feat: join the local and remote peer caches
Browse files Browse the repository at this point in the history
Signed-off-by: remi <remi@berty.tech>
  • Loading branch information
d4ryl00-sudo committed Jun 23, 2020
1 parent 72c6c89 commit 53b3473
Showing 1 changed file with 123 additions and 61 deletions.
184 changes: 123 additions & 61 deletions go/internal/tinder/driver_localdiscovery.go
Expand Up @@ -2,6 +2,7 @@ package tinder

import (
"context"
"errors"
"io"
"math"
mrand "math/rand"
Expand All @@ -25,14 +26,12 @@ import (
const recProtocolID = protocol.ID("berty/p2p/localrecord")

type localDiscovery struct {
logger *zap.Logger
host host.Host
localPeerCache map[string]int64
remotePeerCache map[string]*pCache
localPeerCacheMux sync.RWMutex
remotePeerCacheMux sync.RWMutex
rng *mrand.Rand
rngMux sync.Mutex
logger *zap.Logger
host host.Host
peerCache map[string]*pCache
peerCacheMux sync.RWMutex
rng *mrand.Rand
rngMux sync.Mutex
}

type pCache struct {
Expand All @@ -52,19 +51,92 @@ type StreamWrapper struct {

func NewLocalDiscovery(logger *zap.Logger, host host.Host, rng *mrand.Rand) Driver {
ld := &localDiscovery{
logger: logger.Named("tinder/localDiscovery"),
host: host,
rng: rng,
localPeerCache: make(map[string]int64),
remotePeerCache: make(map[string]*pCache),
logger: logger.Named("tinder/localDiscovery"),
host: host,
rng: rng,
peerCache: make(map[string]*pCache),
}
host.Network().Notify(ld)
host.SetStreamHandler(recProtocolID, ld.handleStream)
return ld
}

func (ld *localDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
ld.logger.Debug("localDiscovery: Advertise", zap.String("CID", ns))
// Delete expired entries of a CID in the peer cache
func (ld *localDiscovery) cleanPeerCache(cid string) {
currentTime := time.Now().Unix()

ld.peerCacheMux.RLock()
_, ok := ld.peerCache[cid]
ld.peerCacheMux.RUnlock()
if ok {
ld.peerCacheMux.Lock()
cache, ok := ld.peerCache[cid]
if ok {
for p := range cache.recs {
expire := cache.recs[p].expire
if expire < currentTime {
delete(cache.recs, p)
}
}
}
}
}

// Update the peer cache
func (ld *localDiscovery) updatePeerCache(cid string, peerID peer.ID, addrInfo peer.AddrInfo, expire int64) {
ld.peerCacheMux.RLock()
cache, ok := ld.peerCache[cid]
ld.peerCacheMux.RUnlock()
if !ok {
ld.peerCacheMux.Lock()
cache, ok = ld.peerCache[cid]
if !ok {
cache = &pCache{recs: make(map[peer.ID]*pRecord)}
ld.peerCache[cid] = cache
}
ld.peerCacheMux.Unlock()
}
cache.mux.Lock()
cache.recs[peerID] = &pRecord{peer: addrInfo, expire: expire}
cache.mux.Unlock()
}

// Return the cache size of a cid
func (ld *localDiscovery) peerCacheLen(cid string) int {
var size int = 0

ld.peerCacheMux.RLock()
cache, ok := ld.peerCache[cid]
ld.peerCacheMux.RUnlock()
if ok {
cache.mux.Lock()
size = len(cache.recs)
cache.mux.Unlock()
}
return size
}

// Delete an entry of the peer cache
func (ld *localDiscovery) deletePeerCacheEntry(cid string, peerID peer.ID) error {
ld.peerCacheMux.RLock()
cache, ok := ld.peerCache[cid]
ld.peerCacheMux.RUnlock()
if ok {
cache.mux.Lock()
_, ok := cache.recs[peerID]
if ok {
delete(cache.recs, peerID)
}
cache.mux.Unlock()
} else {
ld.logger.Error("CID not found from the local peer cache")
return errors.New("delete failed: CID not found")
}
return nil
}

func (ld *localDiscovery) Advertise(ctx context.Context, cid string, opts ...discovery.Option) (time.Duration, error) {
ld.logger.Debug("localDiscovery: Advertise", zap.String("CID", cid))
// Get options
var options discovery.Options
err := options.Apply(opts...)
Expand All @@ -87,24 +159,14 @@ func (ld *localDiscovery) Advertise(ctx context.Context, ns string, opts ...disc
ttlSeconds = int(math.Round(ttl.Seconds()))
}

ld.localPeerCacheMux.Lock()
defer ld.localPeerCacheMux.Unlock()

currentTime := time.Now().Unix()
newCacheSize := len(ld.localPeerCache)

// Remove all expired entries from cache
for p := range ld.localPeerCache {
expire := ld.localPeerCache[p]
if expire < currentTime {
newCacheSize--
delete(ld.localPeerCache, p)
}
}

// delete expired entries before adding a new entry
ld.cleanPeerCache(cid)
peerCacheLen := ld.peerCacheLen(cid)
// Discover new records if we don't have enough
if newCacheSize < limit {
ld.localPeerCache[ns] = int64(ttlSeconds) + currentTime
if peerCacheLen < limit {
currentTime := time.Now().Unix()
expire := int64(ttlSeconds) + currentTime
ld.updatePeerCache(cid, ld.host.ID(), ld.host.Peerstore().PeerInfo(ld.host.ID()), expire)
} else {
ld.logger.Info("localDiscovery: Advertise limit reached")
}
Expand All @@ -129,17 +191,17 @@ func (ld *localDiscovery) FindPeers(ctx context.Context, ns string, opts ...disc
// Get cached peers
var cache *pCache

ld.remotePeerCacheMux.RLock()
cache, ok := ld.remotePeerCache[ns]
ld.remotePeerCacheMux.RUnlock()
ld.peerCacheMux.RLock()
cache, ok := ld.peerCache[ns]
ld.peerCacheMux.RUnlock()
if !ok {
ld.remotePeerCacheMux.Lock()
cache, ok = ld.remotePeerCache[ns]
ld.peerCacheMux.Lock()
cache, ok = ld.peerCache[ns]
if !ok {
cache = &pCache{recs: make(map[peer.ID]*pRecord)}
ld.remotePeerCache[ns] = cache
ld.peerCache[ns] = cache
}
ld.remotePeerCacheMux.Unlock()
ld.peerCacheMux.Unlock()
}

cache.mux.Lock()
Expand Down Expand Up @@ -194,18 +256,9 @@ func (ld *localDiscovery) FindPeers(ctx context.Context, ns string, opts ...disc
}

// Remove the entry from the local peer cache
func (ld *localDiscovery) Unregister(ctx context.Context, ns string) error {
ld.localPeerCacheMux.Lock()
defer ld.localPeerCacheMux.Unlock()

_, ok := ld.localPeerCache[ns]
if !ok {
ld.logger.Error("CID not found from the local peer cache")
// TODO: should return an error?
} else {
delete(ld.localPeerCache, ns)
}
return nil
func (ld *localDiscovery) Unregister(ctx context.Context, cid string) error {
err := ld.deletePeerCacheEntry(cid, ld.host.ID())
return err
}

func (*localDiscovery) Name() string { return "localDiscovery" }
Expand Down Expand Up @@ -240,10 +293,19 @@ func (ld *localDiscovery) sendLocalRecord(c network.Conn) error {
ReadWriter: lzcon,
}

ld.localPeerCacheMux.RLock()
defer ld.localPeerCacheMux.RUnlock()
// constuct a minimal map with local advertised cid
localCache := make(map[string]int64)
ld.peerCacheMux.RLock()
for c := range ld.peerCache {
ld.peerCache[c].mux.Lock()
if rec, ok := ld.peerCache[c].recs[ld.host.ID()]; ok {
localCache[c] = rec.expire
}
ld.peerCache[c].mux.Unlock()
}
ld.peerCacheMux.RUnlock()

lr := &LocalRecord{Records: ld.localPeerCache}
lr := &LocalRecord{Records: localCache}
pbw := ggio.NewDelimitedWriter(sw)
if err := pbw.WriteMsg(lr); err != nil {
ld.logger.Error("localDiscovery: sendLocalRecord", zap.Error(err))
Expand Down Expand Up @@ -277,17 +339,17 @@ func (ld *localDiscovery) handleStream(s network.Stream) {
zap.String("remoteID", s.Conn().RemotePeer().String()),
zap.String("CID", rec),
zap.Int64("expire", lr.Records[rec]))
ld.remotePeerCacheMux.RLock()
cache, ok := ld.remotePeerCache[rec]
ld.remotePeerCacheMux.RUnlock()
ld.peerCacheMux.RLock()
cache, ok := ld.peerCache[rec]
ld.peerCacheMux.RUnlock()
if !ok {
ld.remotePeerCacheMux.Lock()
cache, ok = ld.remotePeerCache[rec]
ld.peerCacheMux.Lock()
cache, ok = ld.peerCache[rec]
if !ok {
cache = &pCache{recs: make(map[peer.ID]*pRecord)}
ld.remotePeerCache[rec] = cache
ld.peerCache[rec] = cache
}
ld.remotePeerCacheMux.Unlock()
ld.peerCacheMux.Unlock()
}
cache.mux.Lock()
addrInfo := ld.host.Peerstore().PeerInfo(s.Conn().RemotePeer())
Expand Down

0 comments on commit 53b3473

Please sign in to comment.