Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

localstore: fix index leak when NN chunks enter localstore #1940

Merged
merged 8 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,15 @@ func (k *Kademlia) IsClosestTo(addr []byte, filter func(*BzzPeer) bool) (closest
return closest
}

// IsWithinDepth checks whether a given address falls within
// this node's saturation depth
func (k *Kademlia) IsWithinDepth(addr []byte) bool {
depth := k.NeighbourhoodDepth()

po, _ := Pof(addr, k.base, 0)
return po >= depth
}

// BaseAddr return the kademlia base address
func (k *Kademlia) BaseAddr() []byte {
return k.base
Expand Down
12 changes: 12 additions & 0 deletions storage/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ type DB struct {
// garbage collection and gc size write workers
// are done
collectGarbageWorkerDone chan struct{}

putToGCCheck func([]byte) bool
}

// Options struct holds optional parameters for configuring DB.
Expand All @@ -133,6 +135,10 @@ type Options struct {
// MetricsPrefix defines a prefix for metrics names.
MetricsPrefix string
Tags *chunk.Tags
// PutSetCheckFunc is a function called after a Put of a chunk
// to verify whether that chunk needs to be Set and added to
// garbage collection index too
PutToGCCheck func([]byte) bool
}

// New returns a new DB. All fields and indexes are initialized
Expand All @@ -145,6 +151,11 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
Capacity: defaultCapacity,
}
}

if o.PutToGCCheck == nil {
o.PutToGCCheck = func(_ []byte) bool { return false }
}

db = &DB{
capacity: o.Capacity,
baseKey: baseKey,
Expand All @@ -156,6 +167,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
collectGarbageTrigger: make(chan struct{}, 1),
close: make(chan struct{}),
collectGarbageWorkerDone: make(chan struct{}),
putToGCCheck: o.PutToGCCheck,
}
if db.capacity <= 0 {
db.capacity = defaultCapacity
Expand Down
133 changes: 95 additions & 38 deletions storage/localstore/mode_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (db *DB) Put(ctx context.Context, mode chunk.ModePut, chs ...chunk.Chunk) (
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}

return exist, err
}

Expand Down Expand Up @@ -94,7 +95,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
exist[i] = true
continue
}
exists, err := db.putUpload(batch, binIDs, chunkToItem(ch))
exists, c, err := db.putUpload(batch, binIDs, chunkToItem(ch))
if err != nil {
return nil, err
}
Expand All @@ -105,6 +106,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
triggerPullFeed[db.po(ch.Address())] = struct{}{}
triggerPushFeed = true
}
gcSizeChange += c
}

case chunk.ModePutSync:
Expand All @@ -113,7 +115,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
exist[i] = true
continue
}
exists, err := db.putSync(batch, binIDs, chunkToItem(ch))
exists, c, err := db.putSync(batch, binIDs, chunkToItem(ch))
if err != nil {
return nil, err
}
Expand All @@ -123,6 +125,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
// after the batch is successfully written
triggerPullFeed[db.po(ch.Address())] = struct{}{}
}
gcSizeChange += c
}

default:
Expand All @@ -142,6 +145,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
if err != nil {
return nil, err
}

for po := range triggerPullFeed {
db.triggerPullSubscriptions(po)
}
Expand All @@ -157,20 +161,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
// The batch can be written to the database.
// Provided batch and binID map are updated.
func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
// check if the chunk already is in the database
// as gc index is updated
i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
exists = true
item.AccessTimestamp = i.AccessTimestamp
case leveldb.ErrNotFound:
exists = false
// no chunk accesses
default:
return false, 0, err
}
i, err = db.retrievalDataIndex.Get(item)
i, err := db.retrievalDataIndex.Get(item)
switch err {
case nil:
exists = true
Expand All @@ -182,11 +173,6 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she
default:
return false, 0, err
}
if item.AccessTimestamp != 0 {
// delete current entry from the gc index
db.gcIndex.DeleteInBatch(batch, item)
gcSizeChange--
}
if item.StoreTimestamp == 0 {
item.StoreTimestamp = now()
}
Expand All @@ -196,13 +182,11 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she
return false, 0, err
}
}
// update access timestamp
item.AccessTimestamp = now()
// update retrieve access index
db.retrievalAccessIndex.PutInBatch(batch, item)
// add new entry to gc index
db.gcIndex.PutInBatch(batch, item)
gcSizeChange++

gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}

db.retrievalDataIndex.PutInBatch(batch, item)

Expand All @@ -213,47 +197,120 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she
// - put to indexes: retrieve, push, pull
// The batch can be written to the database.
// Provided batch and binID map are updated.
func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, err error) {
func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
exists, err = db.retrievalDataIndex.Has(item)
if err != nil {
return false, err
return false, 0, err
}
if exists {
return true, nil
if db.putToGCCheck(item.Address) {
gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
}

return true, 0, nil
}

item.StoreTimestamp = now()
item.BinID, err = db.incBinID(binIDs, db.po(item.Address))
if err != nil {
return false, err
return false, 0, err
}
db.retrievalDataIndex.PutInBatch(batch, item)
db.pullIndex.PutInBatch(batch, item)
db.pushIndex.PutInBatch(batch, item)
return false, nil

if db.putToGCCheck(item.Address) {

// TODO: this might result in an edge case where a node
// that has very little storage and uploads using an anonymous
// upload will have some of the content GCd before being able
// to sync it
gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
}

return false, gcSizeChange, nil
}

// putSync adds an Item to the batch by updating required indexes:
// - put to indexes: retrieve, pull
// The batch can be written to the database.
// Provided batch and binID map are updated.
func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, err error) {
func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
exists, err = db.retrievalDataIndex.Has(item)
if err != nil {
return false, err
return false, 0, err
}
if exists {
return true, nil
if db.putToGCCheck(item.Address) {
gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
}

return true, gcSizeChange, nil
}

item.StoreTimestamp = now()
item.BinID, err = db.incBinID(binIDs, db.po(item.Address))
if err != nil {
return false, err
return false, 0, err
}
db.retrievalDataIndex.PutInBatch(batch, item)
db.pullIndex.PutInBatch(batch, item)
return false, nil

if db.putToGCCheck(item.Address) {
// TODO: this might result in an edge case where a node
// that has very little storage and uploads using an anonymous
// upload will have some of the content GCd before being able
// to sync it
gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
}

return false, gcSizeChange, nil
}

// setGC is a helper function used to add chunks to the retrieval access
// index and the gc index in the cases that the putToGCCheck condition
// vets a gc set. this is to mitigate index leakage in edge cases where
acud marked this conversation as resolved.
Show resolved Hide resolved
// a chunk is added to a node's localstore and given that the chunk is
// already within that node's NN (thus, it can be added to the gc index
// safely)
func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, err error) {
if item.BinID <= 0 {
acud marked this conversation as resolved.
Show resolved Hide resolved
i, err := db.retrievalDataIndex.Get(item)
if err != nil {
return 0, err
}
item.BinID = i.BinID
}
i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
item.AccessTimestamp = i.AccessTimestamp
db.gcIndex.DeleteInBatch(batch, item)
gcSizeChange--
case leveldb.ErrNotFound:
// the chunk is not accessed before
default:
return 0, err
}
item.AccessTimestamp = now()
db.retrievalAccessIndex.PutInBatch(batch, item)

db.gcIndex.PutInBatch(batch, item)
gcSizeChange++

return gcSizeChange, nil
}

// incBinID is a helper function for db.put* methods that increments bin id
Expand Down
Loading