Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
localstore: add a composable function in localstore that checks whether
Browse files Browse the repository at this point in the history
a chunk needs be et after being Put into the database
  • Loading branch information
acud committed Nov 11, 2019
1 parent 52b83a4 commit e10b1e4
Show file tree
Hide file tree
Showing 6 changed files with 386 additions and 47 deletions.
7 changes: 7 additions & 0 deletions network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,13 @@ func (k *Kademlia) IsClosestTo(addr []byte, filter func(*BzzPeer) bool) (closest
return closest
}

func (k *Kademlia) IsAddressWithinDepth(addr []byte) bool {
depth := k.NeighbourhoodDepth()

po, _ := Pof(addr, k.base, 0)
return po >= depth
}

// BaseAddr return the kademlia base address
func (k *Kademlia) BaseAddr() []byte {
return k.base
Expand Down
14 changes: 13 additions & 1 deletion storage/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ type DB struct {
// garbage collection and gc size write workers
// are done
collectGarbageWorkerDone chan struct{}

putSetCheckFn func([]byte) bool
}

// Options struct holds optional parameters for configuring DB.
Expand All @@ -133,6 +135,10 @@ type Options struct {
// MetricsPrefix defines a prefix for metrics names.
MetricsPrefix string
Tags *chunk.Tags
// PutSetCheckFunc is a function called after a Put of a chunk
// to verify whether that chunk needs to be Set and added to
// garbage collection index too
PutSetCheckFunc func([]byte) bool
}

// New returns a new DB. All fields and indexes are initialized
Expand All @@ -142,7 +148,12 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
if o == nil {
// default options
o = &Options{
Capacity: defaultCapacity,
Capacity: defaultCapacity,
PutSetCheckFunc: func(_ []byte) bool { return false },
}
} else {
if o.PutSetCheckFunc == nil {
o.PutSetCheckFunc = func(_ []byte) bool { return false }
}
}
db = &DB{
Expand All @@ -156,6 +167,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
collectGarbageTrigger: make(chan struct{}, 1),
close: make(chan struct{}),
collectGarbageWorkerDone: make(chan struct{}),
putSetCheckFn: o.PutSetCheckFunc,
}
if db.capacity <= 0 {
db.capacity = defaultCapacity
Expand Down
117 changes: 79 additions & 38 deletions storage/localstore/mode_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func (db *DB) Put(ctx context.Context, mode chunk.ModePut, chs ...chunk.Chunk) (
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
addrs := make([]chunk.Address, 0)
for _, c := range chs {
addrs = append(addrs, c.Address())
}

return exist, err
}

Expand Down Expand Up @@ -94,7 +99,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
exist[i] = true
continue
}
exists, err := db.putUpload(batch, binIDs, chunkToItem(ch))
exists, c, err := db.putUpload(batch, binIDs, chunkToItem(ch))
if err != nil {
return nil, err
}
Expand All @@ -105,6 +110,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
triggerPullFeed[db.po(ch.Address())] = struct{}{}
triggerPushFeed = true
}
gcSizeChange += c
}

case chunk.ModePutSync:
Expand All @@ -113,7 +119,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
exist[i] = true
continue
}
exists, err := db.putSync(batch, binIDs, chunkToItem(ch))
exists, c, err := db.putSync(batch, binIDs, chunkToItem(ch))
if err != nil {
return nil, err
}
Expand All @@ -123,6 +129,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
// after the batch is successfully written
triggerPullFeed[db.po(ch.Address())] = struct{}{}
}
gcSizeChange += c
}

default:
Expand All @@ -142,6 +149,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
if err != nil {
return nil, err
}

for po := range triggerPullFeed {
db.triggerPullSubscriptions(po)
}
Expand All @@ -157,20 +165,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err
// The batch can be written to the database.
// Provided batch and binID map are updated.
func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
// check if the chunk already is in the database
// as gc index is updated
i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
exists = true
item.AccessTimestamp = i.AccessTimestamp
case leveldb.ErrNotFound:
exists = false
// no chunk accesses
default:
return false, 0, err
}
i, err = db.retrievalDataIndex.Get(item)
i, err := db.retrievalDataIndex.Get(item)
switch err {
case nil:
exists = true
Expand All @@ -182,11 +177,6 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she
default:
return false, 0, err
}
if item.AccessTimestamp != 0 {
// delete current entry from the gc index
db.gcIndex.DeleteInBatch(batch, item)
gcSizeChange--
}
if item.StoreTimestamp == 0 {
item.StoreTimestamp = now()
}
Expand All @@ -196,13 +186,11 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she
return false, 0, err
}
}
// update access timestamp
item.AccessTimestamp = now()
// update retrieve access index
db.retrievalAccessIndex.PutInBatch(batch, item)
// add new entry to gc index
db.gcIndex.PutInBatch(batch, item)
gcSizeChange++

gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}

db.retrievalDataIndex.PutInBatch(batch, item)

Expand All @@ -213,47 +201,100 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she
// - put to indexes: retrieve, push, pull
// The batch can be written to the database.
// Provided batch and binID map are updated.
func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, err error) {
func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
exists, err = db.retrievalDataIndex.Has(item)
if err != nil {
return false, err
return false, 0, err
}
if exists {
return true, nil
return true, 0, nil
}

item.StoreTimestamp = now()
item.BinID, err = db.incBinID(binIDs, db.po(item.Address))
if err != nil {
return false, err
return false, 0, err
}
db.retrievalDataIndex.PutInBatch(batch, item)
db.pullIndex.PutInBatch(batch, item)
db.pushIndex.PutInBatch(batch, item)
return false, nil

if db.putSetCheckFn(item.Address[:]) {

// TODO: this might result in an edge case where a node
// that has very little storage and uploads using an anonymous
// upload will have some of the content GCd before being able
// to sync it
gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
}

return false, gcSizeChange, nil
}

// putSync adds an Item to the batch by updating required indexes:
// - put to indexes: retrieve, pull
// The batch can be written to the database.
// Provided batch and binID map are updated.
func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, err error) {
func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) {
exists, err = db.retrievalDataIndex.Has(item)
if err != nil {
return false, err
return false, 0, err
}
if exists {
return true, nil
return true, 0, nil
}

item.StoreTimestamp = now()
item.BinID, err = db.incBinID(binIDs, db.po(item.Address))
if err != nil {
return false, err
return false, 0, err
}
db.retrievalDataIndex.PutInBatch(batch, item)
db.pullIndex.PutInBatch(batch, item)
return false, nil

if db.putSetCheckFn(item.Address[:]) {

// TODO: this might result in an edge case where a node
// that has very little storage and uploads using an anonymous
// upload will have some of the content GCd before being able
// to sync it
gcSizeChange, err = db.setGC(batch, item)
if err != nil {
return false, 0, err
}
}

return false, gcSizeChange, nil
}

// setGC is a helper function used to add chunks to the retrieval access
// index and the gc index in the cases that the putSetCheckFn condition
// vets a gc set. this is to mitigate index leakage in edge cases where
// a chunk is added to a node's localstore and given that the chunk is
// already within that node's NN (thus, it can be added to the gc index
// safely)
func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, err error) {
i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
item.AccessTimestamp = i.AccessTimestamp
db.gcIndex.DeleteInBatch(batch, item)
gcSizeChange--
case leveldb.ErrNotFound:
// the chunk is not accessed before
default:
return 0, err
}
item.AccessTimestamp = now()
db.retrievalAccessIndex.PutInBatch(batch, item)

db.gcIndex.PutInBatch(batch, item)
gcSizeChange++

return gcSizeChange, nil
}

// incBinID is a helper function for db.put* methods that increments bin id
Expand Down
41 changes: 41 additions & 0 deletions storage/localstore/mode_put_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,47 @@ func TestModePut_sameChunk(t *testing.T) {
}
}

// TestModePutSync_addToGc validates ModePut* with PutSetCheckFunc stub results
// in the added chunk to show up in GC index
func TestModePut_addToGc(t *testing.T) {
// PutSetCheckFunc will tell localstore to always Set the chunks that enter
opts := &Options{PutSetCheckFunc: func(_ []byte) bool { return true }}
for _, m := range []chunk.ModePut{
chunk.ModePutSync,
chunk.ModePutUpload,
chunk.ModePutRequest,
} {
for _, tc := range multiChunkTestCases {
t.Run(tc.name, func(t *testing.T) {

db, cleanupFunc := newTestDB(t, opts)
defer cleanupFunc()

wantTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantTimestamp
})()

chunks := generateTestRandomChunks(tc.count)

_, err := db.Put(context.Background(), m, chunks...)
if err != nil {
t.Fatal(err)
}

binIDs := make(map[uint8]uint64)

for _, ch := range chunks {
po := db.po(ch.Address())
binIDs[po]++

newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, binIDs[po], nil)(t)
}
})
}
}
}

// TestPutDuplicateChunks validates the expected behaviour for
// passing duplicate chunks to the Put method.
func TestPutDuplicateChunks(t *testing.T) {
Expand Down
17 changes: 9 additions & 8 deletions swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,16 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
log.Info("loaded saved tags successfully from state store")
}

to := network.NewKademlia(
common.FromHex(config.BzzKey),
network.NewKadParams(),
)

localStore, err := localstore.New(config.ChunkDbPath, config.BaseKey, &localstore.Options{
MockStore: mockStore,
Capacity: config.DbCapacity,
Tags: self.tags,
MockStore: mockStore,
Capacity: config.DbCapacity,
Tags: self.tags,
PutSetCheckFunc: to.IsAddressWithinDepth,
})
if err != nil {
return nil, err
Expand All @@ -222,11 +228,6 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e

nodeID := config.Enode.ID()
self.netStore = storage.NewNetStore(lstore, bzzconfig.OverlayAddr, nodeID)

to := network.NewKademlia(
common.FromHex(config.BzzKey),
network.NewKadParams(),
)
self.retrieval = retrieval.New(to, self.netStore, bzzconfig.OverlayAddr, self.swap) // nodeID.Bytes())
self.netStore.RemoteGet = self.retrieval.RequestFromPeers

Expand Down
Loading

0 comments on commit e10b1e4

Please sign in to comment.