Skip to content

Commit

Permalink
feat: change map to struct of fields in the LocalRecord protobuf
Browse files Browse the repository at this point in the history
Signed-off-by: D4ryl00 <remi@berty.tech>
  • Loading branch information
d4ryl00-sudo committed Jun 23, 2020
1 parent e291cec commit e14d081
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 140 deletions.
6 changes: 5 additions & 1 deletion api/go-internal/localrecord.proto
Expand Up @@ -12,5 +12,9 @@ option (gogoproto.unmarshaler_all) = true;
option (gogoproto.sizer_all) = true;

message LocalRecord {
map<string, int64> records = 1;
message Record {
string cid = 1;
int64 expire = 2;
}
repeated Record records = 1;
}
2 changes: 1 addition & 1 deletion go/gen.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 15 additions & 24 deletions go/internal/tinder/driver_localdiscovery.go
Expand Up @@ -57,7 +57,7 @@ var _ discovery.Discovery = (*localDiscovery)(nil)
// https://github.com/libp2p/go-libp2p-core/blob/master/network/notifee.go
var _ network.Notifiee = (*localDiscovery)(nil)

// LocalDiscovery is a network.Notifiee
// LocalDiscovery is a Driver
var _ Driver = (*localDiscovery)(nil)

func NewLocalDiscovery(logger *zap.Logger, host host.Host, rng *mrand.Rand) Driver {
Expand Down Expand Up @@ -218,20 +218,11 @@ func (ld *localDiscovery) FindPeers(ctx context.Context, cid string, opts ...dis
ld.peerCacheMux.Unlock()
}

cache.mux.Lock()
defer cache.mux.Unlock()

// Remove all expired entries from cache
currentTime := time.Now().Unix()
newCacheSize := len(cache.recs)
ld.cleanPeerCache(cid)

for p := range cache.recs {
rec := cache.recs[p]
if rec.expire < currentTime {
newCacheSize--
delete(cache.recs, p)
}
}
cache.mux.Lock()
defer cache.mux.Unlock()

// Randomize and fill channel with available records
count := len(cache.recs)
Expand Down Expand Up @@ -313,19 +304,19 @@ func (ld *localDiscovery) sendLocalRecord(c network.Conn) error {
ReadWriter: lzcon,
}

// constuct a minimal map with local advertised cid
localCache := make(map[string]int64)
// Fill the protobuf LocalRecord struct
lr := &LocalRecord{Records: []*LocalRecord_Record{}}
ld.peerCacheMux.RLock()
for c := range ld.peerCache {
ld.peerCache[c].mux.Lock()
if rec, ok := ld.peerCache[c].recs[ld.host.ID()]; ok {
localCache[c] = rec.expire
record := &LocalRecord_Record{Cid: c, Expire: rec.expire}
lr.Records = append(lr.Records, record)
}
ld.peerCache[c].mux.Unlock()
}
ld.peerCacheMux.RUnlock()

lr := &LocalRecord{Records: localCache}
pbw := ggio.NewDelimitedWriter(sw)
if err := pbw.WriteMsg(lr); err != nil {
ld.logger.Error("localDiscovery: sendLocalRecord", zap.Error(err))
Expand Down Expand Up @@ -355,26 +346,26 @@ func (ld *localDiscovery) handleStream(s network.Stream) {
}

// fill the remote cache
for rec := range lr.Records {
for _, rec := range lr.Records {
ld.logger.Debug("saving remote record in cache",
zap.String("remoteID", s.Conn().RemotePeer().String()),
zap.String("CID", rec),
zap.Int64("expire", lr.Records[rec]))
zap.String("CID", rec.Cid),
zap.Int64("expire", rec.Expire))
ld.peerCacheMux.RLock()
cache, ok := ld.peerCache[rec]
cache, ok := ld.peerCache[rec.Cid]
ld.peerCacheMux.RUnlock()
if !ok {
ld.peerCacheMux.Lock()
cache, ok = ld.peerCache[rec]
cache, ok = ld.peerCache[rec.Cid]
if !ok {
cache = &pCache{recs: make(map[peer.ID]*pRecord)}
ld.peerCache[rec] = cache
ld.peerCache[rec.Cid] = cache
}
ld.peerCacheMux.Unlock()
}
cache.mux.Lock()
addrInfo := ld.host.Peerstore().PeerInfo(s.Conn().RemotePeer())
cache.recs[s.Conn().RemotePeer()] = &pRecord{peer: addrInfo, expire: lr.Records[rec]}
cache.recs[s.Conn().RemotePeer()] = &pRecord{peer: addrInfo, expire: rec.Expire}
cache.mux.Unlock()
}
}
Expand Down

0 comments on commit e14d081

Please sign in to comment.