Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
Revert "localstore: fix index leak when NN chunks enter localstore (#…
Browse files Browse the repository at this point in the history
…1940)"

This reverts commit 70d4fc0.
  • Loading branch information
acud committed Nov 20, 2019
1 parent 3cf1468 commit e586d5b
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 248 deletions.
9 changes: 0 additions & 9 deletions network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,15 +862,6 @@ func (k *Kademlia) IsClosestTo(addr []byte, filter func(*BzzPeer) bool) (closest
return closest
}

// IsWithinDepth checks whether a given address falls within
// this node's saturation depth
func (k *Kademlia) IsWithinDepth(addr []byte) bool {
depth := k.NeighbourhoodDepth()

po, _ := Pof(addr, k.base, 0)
return po >= depth
}

// BaseAddr return the kademlia base address
func (k *Kademlia) BaseAddr() []byte {
return k.base
Expand Down
12 changes: 0 additions & 12 deletions storage/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ type DB struct {
// garbage collection and gc size write workers
// are done
collectGarbageWorkerDone chan struct{}

putToGCCheck func([]byte) bool
}

// Options struct holds optional parameters for configuring DB.
Expand All @@ -135,10 +133,6 @@ type Options struct {
// MetricsPrefix defines a prefix for metrics names.
MetricsPrefix string
Tags *chunk.Tags
// PutSetCheckFunc is a function called after a Put of a chunk
// to verify whether that chunk needs to be Set and added to
// garbage collection index too
PutToGCCheck func([]byte) bool
}

// New returns a new DB. All fields and indexes are initialized
Expand All @@ -151,11 +145,6 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
Capacity: defaultCapacity,
}
}

if o.PutToGCCheck == nil {
o.PutToGCCheck = func(_ []byte) bool { return false }
}

db = &DB{
capacity: o.Capacity,
baseKey: baseKey,
Expand All @@ -167,7 +156,6 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
collectGarbageTrigger: make(chan struct{}, 1),
close: make(chan struct{}),
collectGarbageWorkerDone: make(chan struct{}),
putToGCCheck: o.PutToGCCheck,
}
if db.capacity <= 0 {
db.capacity = defaultCapacity
Expand Down
136 changes: 39 additions & 97 deletions storage/localstore/mode_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func (db *DB) Put(ctx context.Context, mode chunk.ModePut, chs ...chunk.Chunk) (
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}

return exist, err
}

Expand Down Expand Up @@ -95,7 +94,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
exist[i] = true
continue
}
exists, c, err := db.putUpload(batch, binIDs, chunkToItem(ch))
exists, err := db.putUpload(batch, binIDs, chunkToItem(ch))
if err != nil {
return nil, err
}
Expand All @@ -106,7 +105,6 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
triggerPullFeed[db.po(ch.Address())] = struct{}{}
triggerPushFeed = true
}
gcSizeChange += c
}

case chunk.ModePutSync:
Expand All @@ -115,7 +113,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
exist[i] = true
continue
}
exists, c, err := db.putSync(batch, binIDs, chunkToItem(ch))
exists, err := db.putSync(batch, binIDs, chunkToItem(ch))
if err != nil {
return nil, err
}
Expand All @@ -125,7 +123,6 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
// after the batch is successfully written
triggerPullFeed[db.po(ch.Address())] = struct{}{}
}
gcSizeChange += c
}

default:
Expand All @@ -145,7 +142,6 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
if err != nil {
return nil, err
}

for po := range triggerPullFeed {
db.triggerPullSubscriptions(po)
}
Expand All @@ -161,7 +157,20 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
// The batch can be written to the database.
// Provided batch and binID map are updated.
func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
i, err := db.retrievalDataIndex.Get(item)
// check if the chunk already is in the database
// as gc index is updated
i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
exists = true
item.AccessTimestamp = i.AccessTimestamp
case leveldb.ErrNotFound:
exists = false
// no chunk accesses
default:
return false, 0, err
}
i, err = db.retrievalDataIndex.Get(item)
switch err {
case nil:
exists = true
Expand All @@ -173,6 +182,11 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she
default:
return false, 0, err
}
if item.AccessTimestamp != 0 {
// delete current entry from the gc index
db.gcIndex.DeleteInBatch(batch, item)
gcSizeChange--
}
if item.StoreTimestamp == 0 {
item.StoreTimestamp = now()
}
Expand All @@ -182,11 +196,13 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she
return false, 0, err
}
}

gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
// update access timestamp
item.AccessTimestamp = now()
// update retrieve access index
db.retrievalAccessIndex.PutInBatch(batch, item)
// add new entry to gc index
db.gcIndex.PutInBatch(batch, item)
gcSizeChange++

db.retrievalDataIndex.PutInBatch(batch, item)

Expand All @@ -197,20 +213,13 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she
// - put to indexes: retrieve, push, pull
// The batch can be written to the database.
// Provided batch and binID map are updated.
func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, err error) {
exists, err = db.retrievalDataIndex.Has(item)
if err != nil {
return false, 0, err
return false, err
}
if exists {
if db.putToGCCheck(item.Address) {
gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
}

return true, 0, nil
return true, nil
}
anonymous := false
if db.tags != nil && item.Tag != 0 {
Expand All @@ -224,103 +233,36 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed
item.StoreTimestamp = now()
item.BinID, err = db.incBinID(binIDs, db.po(item.Address))
if err != nil {
return false, 0, err
return false, err
}
db.retrievalDataIndex.PutInBatch(batch, item)
db.pullIndex.PutInBatch(batch, item)
if !anonymous {
db.pushIndex.PutInBatch(batch, item)
}

if db.putToGCCheck(item.Address) {

// TODO: this might result in an edge case where a node
// that has very little storage and uploads using an anonymous
// upload will have some of the content GCd before being able
// to sync it
gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
}

return false, gcSizeChange, nil
db.pushIndex.PutInBatch(batch, item)
return false, nil
}

// putSync adds an Item to the batch by updating required indexes:
// - put to indexes: retrieve, pull
// The batch can be written to the database.
// Provided batch and binID map are updated.
func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, err error) {
exists, err = db.retrievalDataIndex.Has(item)
if err != nil {
return false, 0, err
return false, err
}
if exists {
if db.putToGCCheck(item.Address) {
gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
}

return true, gcSizeChange, nil
return true, nil
}

item.StoreTimestamp = now()
item.BinID, err = db.incBinID(binIDs, db.po(item.Address))
if err != nil {
return false, 0, err
return false, err
}
db.retrievalDataIndex.PutInBatch(batch, item)
db.pullIndex.PutInBatch(batch, item)

if db.putToGCCheck(item.Address) {
// TODO: this might result in an edge case where a node
// that has very little storage and uploads using an anonymous
// upload will have some of the content GCd before being able
// to sync it
gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
}

return false, gcSizeChange, nil
}

// setGC is a helper function used to add chunks to the retrieval access
// index and the gc index in the cases that the putToGCCheck condition
// warrants a gc set. this is to mitigate index leakage in edge cases where
// a chunk is added to a node's localstore and given that the chunk is
// already within that node's NN (thus, it can be added to the gc index
// safely)
func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, err error) {
if item.BinID == 0 {
i, err := db.retrievalDataIndex.Get(item)
if err != nil {
return 0, err
}
item.BinID = i.BinID
}
i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
item.AccessTimestamp = i.AccessTimestamp
db.gcIndex.DeleteInBatch(batch, item)
gcSizeChange--
case leveldb.ErrNotFound:
// the chunk is not accessed before
default:
return 0, err
}
item.AccessTimestamp = now()
db.retrievalAccessIndex.PutInBatch(batch, item)

db.gcIndex.PutInBatch(batch, item)
gcSizeChange++

return gcSizeChange, nil
return false, nil
}

// incBinID is a helper function for db.put* methods that increments bin id
Expand Down
Loading

0 comments on commit e586d5b

Please sign in to comment.