diff --git a/network/kademlia.go b/network/kademlia.go index 3e5008e319..fbcbc0bef3 100644 --- a/network/kademlia.go +++ b/network/kademlia.go @@ -758,6 +758,15 @@ func (k *Kademlia) IsClosestTo(addr []byte, filter func(*BzzPeer) bool) (closest return closest } +// IsWithinDepth checks whether a given address falls within +// this node's saturation depth +func (k *Kademlia) IsWithinDepth(addr []byte) bool { + depth := k.NeighbourhoodDepth() + + po, _ := Pof(addr, k.base, 0) + return po >= depth +} + // BaseAddr return the kademlia base address func (k *Kademlia) BaseAddr() []byte { return k.base diff --git a/storage/localstore/localstore.go b/storage/localstore/localstore.go index 1ce094b16d..837b2df0c7 100644 --- a/storage/localstore/localstore.go +++ b/storage/localstore/localstore.go @@ -117,6 +117,8 @@ type DB struct { // garbage collection and gc size write workers // are done collectGarbageWorkerDone chan struct{} + + putToGCCheck func([]byte) bool } // Options struct holds optional parameters for configuring DB. @@ -133,6 +135,10 @@ type Options struct { // MetricsPrefix defines a prefix for metrics names. MetricsPrefix string Tags *chunk.Tags + // PutSetCheckFunc is a function called after a Put of a chunk + // to verify whether that chunk needs to be Set and added to + // garbage collection index too + PutToGCCheck func([]byte) bool } // New returns a new DB. All fields and indexes are initialized @@ -145,6 +151,11 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { Capacity: defaultCapacity, } } + + if o.PutToGCCheck == nil { + o.PutToGCCheck = func(_ []byte) bool { return false } + } + db = &DB{ capacity: o.Capacity, baseKey: baseKey, @@ -156,6 +167,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { collectGarbageTrigger: make(chan struct{}, 1), close: make(chan struct{}), collectGarbageWorkerDone: make(chan struct{}), + putToGCCheck: o.PutToGCCheck, } if db.capacity <= 0 { db.capacity = defaultCapacity diff --git a/storage/localstore/mode_put.go b/storage/localstore/mode_put.go index 70f50bf4a7..bfca64e60c 100644 --- a/storage/localstore/mode_put.go +++ b/storage/localstore/mode_put.go @@ -42,6 +42,7 @@ func (db *DB) Put(ctx context.Context, mode chunk.ModePut, chs ...chunk.Chunk) ( if err != nil { metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) } + return exist, err } @@ -94,7 +95,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err exist[i] = true continue } - exists, err := db.putUpload(batch, binIDs, chunkToItem(ch)) + exists, c, err := db.putUpload(batch, binIDs, chunkToItem(ch)) if err != nil { return nil, err } @@ -105,6 +106,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err triggerPullFeed[db.po(ch.Address())] = struct{}{} triggerPushFeed = true } + gcSizeChange += c } case chunk.ModePutSync: @@ -113,7 +115,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err exist[i] = true continue } - exists, err := db.putSync(batch, binIDs, chunkToItem(ch)) + exists, c, err := db.putSync(batch, binIDs, chunkToItem(ch)) if err != nil { return nil, err } @@ -123,6 +125,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err // after the batch is successfully written triggerPullFeed[db.po(ch.Address())] = struct{}{} } + gcSizeChange += c } default: @@ -142,6 +145,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err if err != nil { return nil, err } + for po := range triggerPullFeed { db.triggerPullSubscriptions(po) } @@ -157,20 +161,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err // The batch can be written to the database. // Provided batch and binID map are updated. func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) { - // check if the chunk already is in the database - // as gc index is updated - i, err := db.retrievalAccessIndex.Get(item) - switch err { - case nil: - exists = true - item.AccessTimestamp = i.AccessTimestamp - case leveldb.ErrNotFound: - exists = false - // no chunk accesses - default: - return false, 0, err - } - i, err = db.retrievalDataIndex.Get(item) + i, err := db.retrievalDataIndex.Get(item) switch err { case nil: exists = true @@ -182,11 +173,6 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she default: return false, 0, err } - if item.AccessTimestamp != 0 { - // delete current entry from the gc index - db.gcIndex.DeleteInBatch(batch, item) - gcSizeChange-- - } if item.StoreTimestamp == 0 { item.StoreTimestamp = now() } @@ -196,13 +182,11 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she return false, 0, err } } - // update access timestamp - item.AccessTimestamp = now() - // update retrieve access index - db.retrievalAccessIndex.PutInBatch(batch, item) - // add new entry to gc index - db.gcIndex.PutInBatch(batch, item) - gcSizeChange++ + + gcSizeChange, err = db.setGC(batch, item) + if err != nil { + return false, 0, err + } db.retrievalDataIndex.PutInBatch(batch, item) @@ -213,47 +197,120 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she // - put to indexes: retrieve, push, pull // The batch can be written to the database. // Provided batch and binID map are updated. -func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, err error) { +func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) { exists, err = db.retrievalDataIndex.Has(item) if err != nil { - return false, err + return false, 0, err } if exists { - return true, nil + if db.putToGCCheck(item.Address) { + gcSizeChange, err = db.setGC(batch, item) + if err != nil { + return false, 0, err + } + } + + return true, 0, nil } item.StoreTimestamp = now() item.BinID, err = db.incBinID(binIDs, db.po(item.Address)) if err != nil { - return false, err + return false, 0, err } db.retrievalDataIndex.PutInBatch(batch, item) db.pullIndex.PutInBatch(batch, item) db.pushIndex.PutInBatch(batch, item) - return false, nil + + if db.putToGCCheck(item.Address) { + + // TODO: this might result in an edge case where a node + // that has very little storage and uploads using an anonymous + // upload will have some of the content GCd before being able + // to sync it + gcSizeChange, err = db.setGC(batch, item) + if err != nil { + return false, 0, err + } + } + + return false, gcSizeChange, nil } // putSync adds an Item to the batch by updating required indexes: // - put to indexes: retrieve, pull // The batch can be written to the database. // Provided batch and binID map are updated. -func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, err error) { +func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) { exists, err = db.retrievalDataIndex.Has(item) if err != nil { - return false, err + return false, 0, err } if exists { - return true, nil + if db.putToGCCheck(item.Address) { + gcSizeChange, err = db.setGC(batch, item) + if err != nil { + return false, 0, err + } + } + + return true, gcSizeChange, nil } item.StoreTimestamp = now() item.BinID, err = db.incBinID(binIDs, db.po(item.Address)) if err != nil { - return false, err + return false, 0, err } db.retrievalDataIndex.PutInBatch(batch, item) db.pullIndex.PutInBatch(batch, item) - return false, nil + + if db.putToGCCheck(item.Address) { + // TODO: this might result in an edge case where a node + // that has very little storage and uploads using an anonymous + // upload will have some of the content GCd before being able + // to sync it + gcSizeChange, err = db.setGC(batch, item) + if err != nil { + return false, 0, err + } + } + + return false, gcSizeChange, nil +} + +// setGC is a helper function used to add chunks to the retrieval access +// index and the gc index in the cases that the putToGCCheck condition +// warrants a gc set. this is to mitigate index leakage in edge cases where +// a chunk is added to a node's localstore and given that the chunk is +// already within that node's NN (thus, it can be added to the gc index +// safely) +func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, err error) { + if item.BinID == 0 { + i, err := db.retrievalDataIndex.Get(item) + if err != nil { + return 0, err + } + item.BinID = i.BinID + } + i, err := db.retrievalAccessIndex.Get(item) + switch err { + case nil: + item.AccessTimestamp = i.AccessTimestamp + db.gcIndex.DeleteInBatch(batch, item) + gcSizeChange-- + case leveldb.ErrNotFound: + // the chunk is not accessed before + default: + return 0, err + } + item.AccessTimestamp = now() + db.retrievalAccessIndex.PutInBatch(batch, item) + + db.gcIndex.PutInBatch(batch, item) + gcSizeChange++ + + return gcSizeChange, nil } // incBinID is a helper function for db.put* methods that increments bin id diff --git a/storage/localstore/mode_put_test.go b/storage/localstore/mode_put_test.go index be7b99f238..ce2277ae4a 100644 --- a/storage/localstore/mode_put_test.go +++ b/storage/localstore/mode_put_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethersphere/swarm/chunk" + "github.com/syndtr/goleveldb/leveldb" ) // TestModePutRequest validates ModePutRequest index values on the provided DB. @@ -318,6 +319,126 @@ func TestModePut_sameChunk(t *testing.T) { } } +// TestModePutSync_addToGc validates ModePut* with PutSetCheckFunc stub results +// in the added chunk to show up in GC index +func TestModePut_addToGc(t *testing.T) { + retVal := true + // PutSetCheckFunc's output is toggled from the test case + opts := &Options{PutToGCCheck: func(_ []byte) bool { return retVal }} + + for _, m := range []struct { + mode chunk.ModePut + putToGc bool + }{ + {mode: chunk.ModePutSync, putToGc: true}, + {mode: chunk.ModePutSync, putToGc: false}, + {mode: chunk.ModePutUpload, putToGc: true}, + {mode: chunk.ModePutUpload, putToGc: false}, + {mode: chunk.ModePutRequest, putToGc: true}, // in ModePutRequest we always insert to GC, so putToGc=false not needed + } { + for _, tc := range multiChunkTestCases { + t.Run(tc.name, func(t *testing.T) { + retVal = m.putToGc + + db, cleanupFunc := newTestDB(t, opts) + defer cleanupFunc() + + wantTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return wantTimestamp + })() + + chunks := generateTestRandomChunks(tc.count) + + _, err := db.Put(context.Background(), m.mode, chunks...) + if err != nil { + t.Fatal(err) + } + + binIDs := make(map[uint8]uint64) + + for _, ch := range chunks { + po := db.po(ch.Address()) + binIDs[po]++ + var wantErr error + if !m.putToGc { + wantErr = leveldb.ErrNotFound + } + newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp) + newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, binIDs[po], wantErr)(t) + } + }) + } + } +} + +// TestModePutSync_addToGcExisting validates ModePut* with PutSetCheckFunc stub results +// in the added chunk to show up in GC index +func TestModePut_addToGcExisting(t *testing.T) { + retVal := true + // PutSetCheckFunc's output is toggled from the test case + opts := &Options{PutToGCCheck: func(_ []byte) bool { return retVal }} + + for _, m := range []struct { + mode chunk.ModePut + putToGc bool + }{ + {mode: chunk.ModePutSync, putToGc: true}, + {mode: chunk.ModePutSync, putToGc: false}, + {mode: chunk.ModePutUpload, putToGc: true}, + {mode: chunk.ModePutUpload, putToGc: false}, + {mode: chunk.ModePutRequest, putToGc: true}, // in ModePutRequest we always insert to GC, so putToGc=false not needed + } { + for _, tc := range multiChunkTestCases { + t.Run(tc.name, func(t *testing.T) { + retVal = m.putToGc + + db, cleanupFunc := newTestDB(t, opts) + defer cleanupFunc() + + wantStoreTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return wantStoreTimestamp + })() + + chunks := generateTestRandomChunks(tc.count) + + _, err := db.Put(context.Background(), m.mode, chunks...) + if err != nil { + t.Fatal(err) + } + + time.Sleep(1 * time.Millisecond) + // change the timestamp, put the chunks again and + // expect the access timestamp to change + wantAccessTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return wantAccessTimestamp + })() + + _, err = db.Put(context.Background(), m.mode, chunks...) + if err != nil { + t.Fatal(err) + } + + binIDs := make(map[uint8]uint64) + + for _, ch := range chunks { + po := db.po(ch.Address()) + binIDs[po]++ + var wantErr error + if !m.putToGc { + wantErr = leveldb.ErrNotFound + } + + newRetrieveIndexesTestWithAccess(db, ch, wantStoreTimestamp, wantAccessTimestamp) + newGCIndexTest(db, ch, wantStoreTimestamp, wantAccessTimestamp, binIDs[po], wantErr)(t) + } + }) + } + } +} + // TestPutDuplicateChunks validates the expected behaviour for // passing duplicate chunks to the Put method. func TestPutDuplicateChunks(t *testing.T) { diff --git a/swarm.go b/swarm.go index 235c8b34aa..0f98387795 100644 --- a/swarm.go +++ b/swarm.go @@ -206,10 +206,16 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e log.Info("loaded saved tags successfully from state store") } + to := network.NewKademlia( + common.FromHex(config.BzzKey), + network.NewKadParams(), + ) + localStore, err := localstore.New(config.ChunkDbPath, config.BaseKey, &localstore.Options{ - MockStore: mockStore, - Capacity: config.DbCapacity, - Tags: self.tags, + MockStore: mockStore, + Capacity: config.DbCapacity, + Tags: self.tags, + PutToGCCheck: to.IsWithinDepth, }) if err != nil { return nil, err @@ -222,11 +228,6 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e nodeID := config.Enode.ID() self.netStore = storage.NewNetStore(lstore, bzzconfig.OverlayAddr, nodeID) - - to := network.NewKademlia( - common.FromHex(config.BzzKey), - network.NewKadParams(), - ) self.retrieval = retrieval.New(to, self.netStore, bzzconfig.OverlayAddr, self.swap) // nodeID.Bytes()) self.netStore.RemoteGet = self.retrieval.RequestFromPeers