Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
stream: introduce caching on localstore Sets
Browse files Browse the repository at this point in the history
  • Loading branch information
acud committed Nov 28, 2019
1 parent 2bda34a commit 52fb861
Showing 1 changed file with 26 additions and 1 deletion.
27 changes: 26 additions & 1 deletion network/stream/sync_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
const (
syncStreamName = "SYNC"
cacheCapacity = 10000
setCacheCapacity = 50000 // 50000 * 32 = ~1.6mb
maxBinZeroSyncPeers = 3
)

Expand All @@ -50,6 +51,8 @@ type syncProvider struct {
quit chan struct{} // shutdown
cacheMtx sync.RWMutex // synchronization primitive to protect cache
cache *lru.Cache // cache to minimize load on netstore
setCacheMtx sync.RWMutex // set cache mutex
setCache *lru.Cache // cache to reduce load on localstore to not set the same chunk as synced
logger log.Logger // logger that appends the base address to loglines
binZeroSem chan struct{} // semaphore to limit number of syncing peers on bin 0
}
Expand All @@ -63,6 +66,10 @@ func NewSyncProvider(ns *storage.NetStore, kad *network.Kademlia, autostart bool
if err != nil {
panic(err)
}
sc, err := lru.New(cacheCapacity)
if err != nil {
panic(err)
}

return &syncProvider{
netStore: ns,
Expand All @@ -72,6 +79,7 @@ func NewSyncProvider(ns *storage.NetStore, kad *network.Kademlia, autostart bool
name: syncStreamName,
quit: make(chan struct{}),
cache: c,
setCache: sc,
logger: log.New("base", hex.EncodeToString(kad.BaseAddr()[:16])),
binZeroSem: make(chan struct{}, maxBinZeroSyncPeers),
}
Expand Down Expand Up @@ -190,11 +198,28 @@ func (s *syncProvider) Get(ctx context.Context, addr ...chunk.Address) ([]chunk.

// Set the supplied addrs as synced in order to allow for garbage collection
func (s *syncProvider) Set(ctx context.Context, addrs ...chunk.Address) error {
err := s.netStore.Set(ctx, chunk.ModeSetSyncPull, addrs...)
var chunksToSet []chunk.Address

s.setCacheMtx.RLock()
for _, addr := range addrs {
if _, ok := s.setCache.Get(addr); !ok {
chunksToSet = append(chunksToSet, addr)
}
}
s.setCacheMtx.RUnlock()

err := s.netStore.Set(ctx, chunk.ModeSetSyncPull, chunksToSet...)
if err != nil {
metrics.GetOrRegisterCounter("syncProvider.set-sync-err", nil).Inc(1)
return err
}

s.setCacheMtx.Lock()
defer s.setCacheMtx.Unlock()

for _, addr := range chunksToSet {
s.setCache.Add(addr, struct{}{})
}
return nil
}

Expand Down

0 comments on commit 52fb861

Please sign in to comment.