From e10b1e480f1309081d7ae81a31a8938d8499e46e Mon Sep 17 00:00:00 2001 From: acud Date: Thu, 7 Nov 2019 15:10:05 +0530 Subject: [PATCH 1/8] localstore: add a composable function in localstore that checks whether a chunk needs be et after being Put into the database --- network/kademlia.go | 7 + storage/localstore/localstore.go | 14 +- storage/localstore/mode_put.go | 117 +++++++++----- storage/localstore/mode_put_test.go | 41 +++++ swarm.go | 17 +- sync_test.go | 237 ++++++++++++++++++++++++++++ 6 files changed, 386 insertions(+), 47 deletions(-) create mode 100644 sync_test.go diff --git a/network/kademlia.go b/network/kademlia.go index 3e5008e319..39ac1df3c5 100644 --- a/network/kademlia.go +++ b/network/kademlia.go @@ -758,6 +758,13 @@ func (k *Kademlia) IsClosestTo(addr []byte, filter func(*BzzPeer) bool) (closest return closest } +func (k *Kademlia) IsAddressWithinDepth(addr []byte) bool { + depth := k.NeighbourhoodDepth() + + po, _ := Pof(addr, k.base, 0) + return po >= depth +} + // BaseAddr return the kademlia base address func (k *Kademlia) BaseAddr() []byte { return k.base diff --git a/storage/localstore/localstore.go b/storage/localstore/localstore.go index 1ce094b16d..efcf0092e0 100644 --- a/storage/localstore/localstore.go +++ b/storage/localstore/localstore.go @@ -117,6 +117,8 @@ type DB struct { // garbage collection and gc size write workers // are done collectGarbageWorkerDone chan struct{} + + putSetCheckFn func([]byte) bool } // Options struct holds optional parameters for configuring DB. @@ -133,6 +135,10 @@ type Options struct { // MetricsPrefix defines a prefix for metrics names. MetricsPrefix string Tags *chunk.Tags + // PutSetCheckFunc is a function called after a Put of a chunk + // to verify whether that chunk needs to be Set and added to + // garbage collection index too + PutSetCheckFunc func([]byte) bool } // New returns a new DB. All fields and indexes are initialized @@ -142,7 +148,12 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { if o == nil { // default options o = &Options{ - Capacity: defaultCapacity, + Capacity: defaultCapacity, + PutSetCheckFunc: func(_ []byte) bool { return false }, + } + } else { + if o.PutSetCheckFunc == nil { + o.PutSetCheckFunc = func(_ []byte) bool { return false } } } db = &DB{ @@ -156,6 +167,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { collectGarbageTrigger: make(chan struct{}, 1), close: make(chan struct{}), collectGarbageWorkerDone: make(chan struct{}), + putSetCheckFn: o.PutSetCheckFunc, } if db.capacity <= 0 { db.capacity = defaultCapacity diff --git a/storage/localstore/mode_put.go b/storage/localstore/mode_put.go index 70f50bf4a7..5c69835bd2 100644 --- a/storage/localstore/mode_put.go +++ b/storage/localstore/mode_put.go @@ -42,6 +42,11 @@ func (db *DB) Put(ctx context.Context, mode chunk.ModePut, chs ...chunk.Chunk) ( if err != nil { metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) } + addrs := make([]chunk.Address, 0) + for _, c := range chs { + addrs = append(addrs, c.Address()) + } + return exist, err } @@ -94,7 +99,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err exist[i] = true continue } - exists, err := db.putUpload(batch, binIDs, chunkToItem(ch)) + exists, c, err := db.putUpload(batch, binIDs, chunkToItem(ch)) if err != nil { return nil, err } @@ -105,6 +110,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err triggerPullFeed[db.po(ch.Address())] = struct{}{} triggerPushFeed = true } + gcSizeChange += c } case chunk.ModePutSync: @@ -113,7 +119,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err exist[i] = true continue } - exists, err := db.putSync(batch, binIDs, chunkToItem(ch)) + exists, c, err := db.putSync(batch, binIDs, chunkToItem(ch)) if err != nil { return nil, err } @@ -123,6 +129,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err // after the batch is successfully written triggerPullFeed[db.po(ch.Address())] = struct{}{} } + gcSizeChange += c } default: @@ -142,6 +149,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err if err != nil { return nil, err } + for po := range triggerPullFeed { db.triggerPullSubscriptions(po) } @@ -157,20 +165,7 @@ func (db *DB) put(mode chunk.ModePut, chs ...chunk.Chunk) (exist []bool, err err // The batch can be written to the database. // Provided batch and binID map are updated. func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) { - // check if the chunk already is in the database - // as gc index is updated - i, err := db.retrievalAccessIndex.Get(item) - switch err { - case nil: - exists = true - item.AccessTimestamp = i.AccessTimestamp - case leveldb.ErrNotFound: - exists = false - // no chunk accesses - default: - return false, 0, err - } - i, err = db.retrievalDataIndex.Get(item) + i, err := db.retrievalDataIndex.Get(item) switch err { case nil: exists = true @@ -182,11 +177,6 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she default: return false, 0, err } - if item.AccessTimestamp != 0 { - // delete current entry from the gc index - db.gcIndex.DeleteInBatch(batch, item) - gcSizeChange-- - } if item.StoreTimestamp == 0 { item.StoreTimestamp = now() } @@ -196,13 +186,11 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she return false, 0, err } } - // update access timestamp - item.AccessTimestamp = now() - // update retrieve access index - db.retrievalAccessIndex.PutInBatch(batch, item) - // add new entry to gc index - db.gcIndex.PutInBatch(batch, item) - gcSizeChange++ + + gcSizeChange, err = db.setGC(batch, item) + if err != nil { + return false, 0, err + } db.retrievalDataIndex.PutInBatch(batch, item) @@ -213,47 +201,100 @@ func (db *DB) putRequest(batch *leveldb.Batch, binIDs map[uint8]uint64, item she // - put to indexes: retrieve, push, pull // The batch can be written to the database. // Provided batch and binID map are updated. -func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, err error) { +func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) { exists, err = db.retrievalDataIndex.Has(item) if err != nil { - return false, err + return false, 0, err } if exists { - return true, nil + return true, 0, nil } item.StoreTimestamp = now() item.BinID, err = db.incBinID(binIDs, db.po(item.Address)) if err != nil { - return false, err + return false, 0, err } db.retrievalDataIndex.PutInBatch(batch, item) db.pullIndex.PutInBatch(batch, item) db.pushIndex.PutInBatch(batch, item) - return false, nil + + if db.putSetCheckFn(item.Address[:]) { + + // TODO: this might result in an edge case where a node + // that has very little storage and uploads using an anonymous + // upload will have some of the content GCd before being able + // to sync it + gcSizeChange, err = db.setGC(batch, item) + if err != nil { + return false, 0, err + } + } + + return false, gcSizeChange, nil } // putSync adds an Item to the batch by updating required indexes: // - put to indexes: retrieve, pull // The batch can be written to the database. // Provided batch and binID map are updated. -func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, err error) { +func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.Item) (exists bool, gcSizeChange int64, err error) { exists, err = db.retrievalDataIndex.Has(item) if err != nil { - return false, err + return false, 0, err } if exists { - return true, nil + return true, 0, nil } item.StoreTimestamp = now() item.BinID, err = db.incBinID(binIDs, db.po(item.Address)) if err != nil { - return false, err + return false, 0, err } db.retrievalDataIndex.PutInBatch(batch, item) db.pullIndex.PutInBatch(batch, item) - return false, nil + + if db.putSetCheckFn(item.Address[:]) { + + // TODO: this might result in an edge case where a node + // that has very little storage and uploads using an anonymous + // upload will have some of the content GCd before being able + // to sync it + gcSizeChange, err = db.setGC(batch, item) + if err != nil { + return false, 0, err + } + } + + return false, gcSizeChange, nil +} + +// setGC is a helper function used to add chunks to the retrieval access +// index and the gc index in the cases that the putSetCheckFn condition +// vets a gc set. this is to mitigate index leakage in edge cases where +// a chunk is added to a node's localstore and given that the chunk is +// already within that node's NN (thus, it can be added to the gc index +// safely) +func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, err error) { + i, err := db.retrievalAccessIndex.Get(item) + switch err { + case nil: + item.AccessTimestamp = i.AccessTimestamp + db.gcIndex.DeleteInBatch(batch, item) + gcSizeChange-- + case leveldb.ErrNotFound: + // the chunk is not accessed before + default: + return 0, err + } + item.AccessTimestamp = now() + db.retrievalAccessIndex.PutInBatch(batch, item) + + db.gcIndex.PutInBatch(batch, item) + gcSizeChange++ + + return gcSizeChange, nil } // incBinID is a helper function for db.put* methods that increments bin id diff --git a/storage/localstore/mode_put_test.go b/storage/localstore/mode_put_test.go index be7b99f238..c814cba5be 100644 --- a/storage/localstore/mode_put_test.go +++ b/storage/localstore/mode_put_test.go @@ -318,6 +318,47 @@ func TestModePut_sameChunk(t *testing.T) { } } +// TestModePutSync_addToGc validates ModePut* with PutSetCheckFunc stub results +// in the added chunk to show up in GC index +func TestModePut_addToGc(t *testing.T) { + // PutSetCheckFunc will tell localstore to always Set the chunks that enter + opts := &Options{PutSetCheckFunc: func(_ []byte) bool { return true }} + for _, m := range []chunk.ModePut{ + chunk.ModePutSync, + chunk.ModePutUpload, + chunk.ModePutRequest, + } { + for _, tc := range multiChunkTestCases { + t.Run(tc.name, func(t *testing.T) { + + db, cleanupFunc := newTestDB(t, opts) + defer cleanupFunc() + + wantTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return wantTimestamp + })() + + chunks := generateTestRandomChunks(tc.count) + + _, err := db.Put(context.Background(), m, chunks...) + if err != nil { + t.Fatal(err) + } + + binIDs := make(map[uint8]uint64) + + for _, ch := range chunks { + po := db.po(ch.Address()) + binIDs[po]++ + + newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, binIDs[po], nil)(t) + } + }) + } + } +} + // TestPutDuplicateChunks validates the expected behaviour for // passing duplicate chunks to the Put method. func TestPutDuplicateChunks(t *testing.T) { diff --git a/swarm.go b/swarm.go index 235c8b34aa..b1b04ebb79 100644 --- a/swarm.go +++ b/swarm.go @@ -206,10 +206,16 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e log.Info("loaded saved tags successfully from state store") } + to := network.NewKademlia( + common.FromHex(config.BzzKey), + network.NewKadParams(), + ) + localStore, err := localstore.New(config.ChunkDbPath, config.BaseKey, &localstore.Options{ - MockStore: mockStore, - Capacity: config.DbCapacity, - Tags: self.tags, + MockStore: mockStore, + Capacity: config.DbCapacity, + Tags: self.tags, + PutSetCheckFunc: to.IsAddressWithinDepth, }) if err != nil { return nil, err @@ -222,11 +228,6 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e nodeID := config.Enode.ID() self.netStore = storage.NewNetStore(lstore, bzzconfig.OverlayAddr, nodeID) - - to := network.NewKademlia( - common.FromHex(config.BzzKey), - network.NewKadParams(), - ) self.retrieval = retrieval.New(to, self.netStore, bzzconfig.OverlayAddr, self.swap) // nodeID.Bytes()) self.netStore.RemoteGet = self.retrieval.RequestFromPeers diff --git a/sync_test.go b/sync_test.go new file mode 100644 index 0000000000..d95e7c2b81 --- /dev/null +++ b/sync_test.go @@ -0,0 +1,237 @@ +package swarm + +import ( + "context" + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "math/rand" + "os" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethersphere/swarm/api" + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/network" + "github.com/ethersphere/swarm/network/simulation" + "github.com/ethersphere/swarm/storage" +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +var ( + random = rand.New(rand.NewSource(time.Now().UnixNano())) + + bucketKeyLocalStore = "localstore" + bucketKeyAPI = "api" + bucketKeyInspector = "inspector" +) + +func TestSync(t *testing.T) { + var ( + nodeCount = 10 + iterationCount = 5 + fileSize int64 = 60 * 4096 + dbCapacity uint64 = 100 + ) + + sim := simulation.NewInProc(map[string]simulation.ServiceFunc{ + "bootnode": newServiceFunc(true, dbCapacity), + "swarm": newServiceFunc(false, dbCapacity), + }) + defer sim.Close() + + bootnode, err := sim.AddNode(simulation.AddNodeWithService("bootnode")) + if err != nil { + t.Fatal(err) + } + + nodes, err := sim.AddNodes(nodeCount, simulation.AddNodeWithService("swarm")) + if err != nil { + t.Fatal(err) + } + + if err := sim.Net.ConnectNodesStar(nodes, bootnode); err != nil { + t.Fatal(err) + } + + time.Sleep(2 * time.Second) + + for i := 1; i <= iterationCount; i++ { + nodeIndex := 0 + fmt.Println("sync simulation start", "iteration", i, "uploadingNode", nodeIndex) + + startUpload := time.Now() + addr, checksum := uploadRandomFile(t, sim.MustNodeItem(nodes[nodeIndex], bucketKeyAPI).(*api.API), fileSize) + fmt.Println("sync simulation upload", "iteration", i, "upload", time.Since(startUpload), "checksum", checksum) + + startSyncing := time.Now() + + time.Sleep(1 * time.Second) + + for syncing := true; syncing; { + time.Sleep(100 * time.Millisecond) + syncing = false + for _, n := range nodes { + if sim.MustNodeItem(n, bucketKeyInspector).(*api.Inspector).IsPullSyncing() { + syncing = true + } + } + } + fmt.Println("sync simulation syncing", "iteration", i, "syncing", time.Since(startSyncing)-api.InspectorIsPullSyncingTolerance) + + retrievalStart := time.Now() + + for ni, n := range nodes { + checkFile(t, sim.MustNodeItem(n, bucketKeyAPI).(*api.API), addr, checksum) + + insp := sim.MustNodeItem(n, bucketKeyInspector).(*api.Inspector) + si, err := insp.StorageIndices() + if err != nil { + t.Fatal(err) + } + if uint64(si["retrievalDataIndex"]) > dbCapacity { + sijson, err := json.MarshalIndent(si, "", " ") + if err != nil { + t.Fatal(err) + } + fmt.Println("sync simulation storage indexes", "iteration", i, "node", ni, "indexes", string(sijson)) + k := sim.MustNodeItem(n, simulation.BucketKeyKademlia).(*network.Kademlia) + fmt.Println("sync simulation kademlia", "iteration", i, "node", ni, "kademlia", k.String()) + x, err := insp.PeerStreams() + if err != nil { + t.Fatal(err) + } + xjson, err := json.MarshalIndent(x, "", " ") + if err != nil { + t.Fatal(err) + } + fmt.Println("sync simulation subscriptions", "iteration", i, "node", ni, "subscriptions", string(xjson)) + + binIDs := make(map[uint8]uint64) + n := sim.MustNodeItem(n, bucketKeyLocalStore).(chunk.Store) + for i := uint8(0); i <= chunk.MaxPO; i++ { + binIDs[i], err = n.LastPullSubscriptionBinID(i) + if err != nil { + t.Fatal(err) + } + } + binIDsjson, err := json.MarshalIndent(binIDs, "", " ") + if err != nil { + t.Fatal(err) + } + fmt.Println("sync simulation binids", "iteration", i, "node", ni, "binids", string(binIDsjson)) + } + } + fmt.Println("sync simulation retrieval", "iteration", i, "retrieval", time.Since(retrievalStart)) + fmt.Println("sync simulation done", "iteration", i, "duration", time.Since(startUpload)-api.InspectorIsPullSyncingTolerance) + } +} + +func newServiceFunc(bootnode bool, dbCapacity uint64) func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + return func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + config := api.NewConfig() + + config.BootnodeMode = bootnode + config.DbCapacity = dbCapacity + config.PushSyncEnabled = false + config.SyncEnabled = true + + dir, err := ioutil.TempDir("", "swarm-sync-test-node") + if err != nil { + return nil, nil, err + } + cleanup = func() { + err := os.RemoveAll(dir) + if err != nil { + log.Error("cleaning up swarm temp dir", "err", err) + } + } + + config.Path = dir + + privkey, err := crypto.GenerateKey() + if err != nil { + return nil, cleanup, err + } + nodekey, err := crypto.GenerateKey() + if err != nil { + return nil, cleanup, err + } + + config.Init(privkey, nodekey) + config.Port = "" + + sw, err := NewSwarm(config, nil) + if err != nil { + return nil, cleanup, err + } + bucket.Store(simulation.BucketKeyKademlia, sw.bzz.Hive.Kademlia) + bucket.Store(bucketKeyLocalStore, sw.netStore.Store) + bucket.Store(bucketKeyAPI, sw.api) + bucket.Store(bucketKeyInspector, sw.inspector) + fmt.Println("new swarm", "bzzKey", config.BzzKey, "baseAddr", fmt.Sprintf("%x", sw.bzz.BaseAddr())) + return sw, cleanup, nil + } +} + +func uploadRandomFile(t *testing.T, a *api.API, length int64) (chunk.Address, string) { + t.Helper() + + ctx := context.Background() + + hasher := md5.New() + + key, wait, err := a.Store( + ctx, + io.TeeReader(io.LimitReader(random, length), hasher), + length, + false, + ) + if err != nil { + t.Fatalf("store file: %v", err) + } + + if err := wait(ctx); err != nil { + t.Fatalf("wait for file to be stored: %v", err) + } + + return key, hex.EncodeToString(hasher.Sum(nil)) +} + +func storeFile(ctx context.Context, a *api.API, r io.Reader, length int64, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) { + key, wait, err := a.Store(ctx, r, length, toEncrypt) + if err != nil { + return nil, nil, err + } + return key, wait, nil +} + +func checkFile(t *testing.T, a *api.API, addr chunk.Address, checksum string) { + t.Helper() + + r, _ := a.Retrieve(context.Background(), addr) + + hasher := md5.New() + + n, err := io.Copy(hasher, r) + if err != nil { + t.Fatal(err) + } + + got := hex.EncodeToString(hasher.Sum(nil)) + + if got != checksum { + t.Fatalf("got file checksum %s (length %v), want %s", got, n, checksum) + } +} From e7d5ce761fd7d0e53e629954171208cf0303a21c Mon Sep 17 00:00:00 2001 From: acud Date: Mon, 11 Nov 2019 19:36:11 +0530 Subject: [PATCH 2/8] swarm: remove sync test --- sync_test.go | 237 --------------------------------------------------- 1 file changed, 237 deletions(-) delete mode 100644 sync_test.go diff --git a/sync_test.go b/sync_test.go deleted file mode 100644 index d95e7c2b81..0000000000 --- a/sync_test.go +++ /dev/null @@ -1,237 +0,0 @@ -package swarm - -import ( - "context" - "crypto/md5" - "encoding/hex" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "math/rand" - "os" - "sync" - "testing" - "time" - - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - "github.com/ethersphere/swarm/api" - "github.com/ethersphere/swarm/chunk" - "github.com/ethersphere/swarm/network" - "github.com/ethersphere/swarm/network/simulation" - "github.com/ethersphere/swarm/storage" -) - -func init() { - rand.Seed(time.Now().UnixNano()) -} - -var ( - random = rand.New(rand.NewSource(time.Now().UnixNano())) - - bucketKeyLocalStore = "localstore" - bucketKeyAPI = "api" - bucketKeyInspector = "inspector" -) - -func TestSync(t *testing.T) { - var ( - nodeCount = 10 - iterationCount = 5 - fileSize int64 = 60 * 4096 - dbCapacity uint64 = 100 - ) - - sim := simulation.NewInProc(map[string]simulation.ServiceFunc{ - "bootnode": newServiceFunc(true, dbCapacity), - "swarm": newServiceFunc(false, dbCapacity), - }) - defer sim.Close() - - bootnode, err := sim.AddNode(simulation.AddNodeWithService("bootnode")) - if err != nil { - t.Fatal(err) - } - - nodes, err := sim.AddNodes(nodeCount, simulation.AddNodeWithService("swarm")) - if err != nil { - t.Fatal(err) - } - - if err := sim.Net.ConnectNodesStar(nodes, bootnode); err != nil { - t.Fatal(err) - } - - time.Sleep(2 * time.Second) - - for i := 1; i <= iterationCount; i++ { - nodeIndex := 0 - fmt.Println("sync simulation start", "iteration", i, "uploadingNode", nodeIndex) - - startUpload := time.Now() - addr, checksum := uploadRandomFile(t, sim.MustNodeItem(nodes[nodeIndex], bucketKeyAPI).(*api.API), fileSize) - fmt.Println("sync simulation upload", "iteration", i, "upload", time.Since(startUpload), "checksum", checksum) - - startSyncing := time.Now() - - time.Sleep(1 * time.Second) - - for syncing := true; syncing; { - time.Sleep(100 * time.Millisecond) - syncing = false - for _, n := range nodes { - if sim.MustNodeItem(n, bucketKeyInspector).(*api.Inspector).IsPullSyncing() { - syncing = true - } - } - } - fmt.Println("sync simulation syncing", "iteration", i, "syncing", time.Since(startSyncing)-api.InspectorIsPullSyncingTolerance) - - retrievalStart := time.Now() - - for ni, n := range nodes { - checkFile(t, sim.MustNodeItem(n, bucketKeyAPI).(*api.API), addr, checksum) - - insp := sim.MustNodeItem(n, bucketKeyInspector).(*api.Inspector) - si, err := insp.StorageIndices() - if err != nil { - t.Fatal(err) - } - if uint64(si["retrievalDataIndex"]) > dbCapacity { - sijson, err := json.MarshalIndent(si, "", " ") - if err != nil { - t.Fatal(err) - } - fmt.Println("sync simulation storage indexes", "iteration", i, "node", ni, "indexes", string(sijson)) - k := sim.MustNodeItem(n, simulation.BucketKeyKademlia).(*network.Kademlia) - fmt.Println("sync simulation kademlia", "iteration", i, "node", ni, "kademlia", k.String()) - x, err := insp.PeerStreams() - if err != nil { - t.Fatal(err) - } - xjson, err := json.MarshalIndent(x, "", " ") - if err != nil { - t.Fatal(err) - } - fmt.Println("sync simulation subscriptions", "iteration", i, "node", ni, "subscriptions", string(xjson)) - - binIDs := make(map[uint8]uint64) - n := sim.MustNodeItem(n, bucketKeyLocalStore).(chunk.Store) - for i := uint8(0); i <= chunk.MaxPO; i++ { - binIDs[i], err = n.LastPullSubscriptionBinID(i) - if err != nil { - t.Fatal(err) - } - } - binIDsjson, err := json.MarshalIndent(binIDs, "", " ") - if err != nil { - t.Fatal(err) - } - fmt.Println("sync simulation binids", "iteration", i, "node", ni, "binids", string(binIDsjson)) - } - } - fmt.Println("sync simulation retrieval", "iteration", i, "retrieval", time.Since(retrievalStart)) - fmt.Println("sync simulation done", "iteration", i, "duration", time.Since(startUpload)-api.InspectorIsPullSyncingTolerance) - } -} - -func newServiceFunc(bootnode bool, dbCapacity uint64) func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - return func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - config := api.NewConfig() - - config.BootnodeMode = bootnode - config.DbCapacity = dbCapacity - config.PushSyncEnabled = false - config.SyncEnabled = true - - dir, err := ioutil.TempDir("", "swarm-sync-test-node") - if err != nil { - return nil, nil, err - } - cleanup = func() { - err := os.RemoveAll(dir) - if err != nil { - log.Error("cleaning up swarm temp dir", "err", err) - } - } - - config.Path = dir - - privkey, err := crypto.GenerateKey() - if err != nil { - return nil, cleanup, err - } - nodekey, err := crypto.GenerateKey() - if err != nil { - return nil, cleanup, err - } - - config.Init(privkey, nodekey) - config.Port = "" - - sw, err := NewSwarm(config, nil) - if err != nil { - return nil, cleanup, err - } - bucket.Store(simulation.BucketKeyKademlia, sw.bzz.Hive.Kademlia) - bucket.Store(bucketKeyLocalStore, sw.netStore.Store) - bucket.Store(bucketKeyAPI, sw.api) - bucket.Store(bucketKeyInspector, sw.inspector) - fmt.Println("new swarm", "bzzKey", config.BzzKey, "baseAddr", fmt.Sprintf("%x", sw.bzz.BaseAddr())) - return sw, cleanup, nil - } -} - -func uploadRandomFile(t *testing.T, a *api.API, length int64) (chunk.Address, string) { - t.Helper() - - ctx := context.Background() - - hasher := md5.New() - - key, wait, err := a.Store( - ctx, - io.TeeReader(io.LimitReader(random, length), hasher), - length, - false, - ) - if err != nil { - t.Fatalf("store file: %v", err) - } - - if err := wait(ctx); err != nil { - t.Fatalf("wait for file to be stored: %v", err) - } - - return key, hex.EncodeToString(hasher.Sum(nil)) -} - -func storeFile(ctx context.Context, a *api.API, r io.Reader, length int64, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) { - key, wait, err := a.Store(ctx, r, length, toEncrypt) - if err != nil { - return nil, nil, err - } - return key, wait, nil -} - -func checkFile(t *testing.T, a *api.API, addr chunk.Address, checksum string) { - t.Helper() - - r, _ := a.Retrieve(context.Background(), addr) - - hasher := md5.New() - - n, err := io.Copy(hasher, r) - if err != nil { - t.Fatal(err) - } - - got := hex.EncodeToString(hasher.Sum(nil)) - - if got != checksum { - t.Fatalf("got file checksum %s (length %v), want %s", got, n, checksum) - } -} From 206d359434c2265aaed7d5f3a55d2c8d60d3cdcf Mon Sep 17 00:00:00 2001 From: acud Date: Mon, 11 Nov 2019 19:47:20 +0530 Subject: [PATCH 3/8] network, swarm, storage: address PR comments --- network/kademlia.go | 4 +++- storage/localstore/localstore.go | 18 +++++++++--------- storage/localstore/mode_put.go | 10 +++------- storage/localstore/mode_put_test.go | 2 +- swarm.go | 8 ++++---- 5 files changed, 20 insertions(+), 22 deletions(-) diff --git a/network/kademlia.go b/network/kademlia.go index 39ac1df3c5..fbcbc0bef3 100644 --- a/network/kademlia.go +++ b/network/kademlia.go @@ -758,7 +758,9 @@ func (k *Kademlia) IsClosestTo(addr []byte, filter func(*BzzPeer) bool) (closest return closest } -func (k *Kademlia) IsAddressWithinDepth(addr []byte) bool { +// IsWithinDepth checks whether a given address falls within +// this node's saturation depth +func (k *Kademlia) IsWithinDepth(addr []byte) bool { depth := k.NeighbourhoodDepth() po, _ := Pof(addr, k.base, 0) diff --git a/storage/localstore/localstore.go b/storage/localstore/localstore.go index efcf0092e0..837b2df0c7 100644 --- a/storage/localstore/localstore.go +++ b/storage/localstore/localstore.go @@ -118,7 +118,7 @@ type DB struct { // are done collectGarbageWorkerDone chan struct{} - putSetCheckFn func([]byte) bool + putToGCCheck func([]byte) bool } // Options struct holds optional parameters for configuring DB. @@ -138,7 +138,7 @@ type Options struct { // PutSetCheckFunc is a function called after a Put of a chunk // to verify whether that chunk needs to be Set and added to // garbage collection index too - PutSetCheckFunc func([]byte) bool + PutToGCCheck func([]byte) bool } // New returns a new DB. All fields and indexes are initialized @@ -148,14 +148,14 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { if o == nil { // default options o = &Options{ - Capacity: defaultCapacity, - PutSetCheckFunc: func(_ []byte) bool { return false }, - } - } else { - if o.PutSetCheckFunc == nil { - o.PutSetCheckFunc = func(_ []byte) bool { return false } + Capacity: defaultCapacity, } } + + if o.PutToGCCheck == nil { + o.PutToGCCheck = func(_ []byte) bool { return false } + } + db = &DB{ capacity: o.Capacity, baseKey: baseKey, @@ -167,7 +167,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { collectGarbageTrigger: make(chan struct{}, 1), close: make(chan struct{}), collectGarbageWorkerDone: make(chan struct{}), - putSetCheckFn: o.PutSetCheckFunc, + putToGCCheck: o.PutToGCCheck, } if db.capacity <= 0 { db.capacity = defaultCapacity diff --git a/storage/localstore/mode_put.go b/storage/localstore/mode_put.go index 5c69835bd2..2dfbba8b17 100644 --- a/storage/localstore/mode_put.go +++ b/storage/localstore/mode_put.go @@ -42,10 +42,6 @@ func (db *DB) Put(ctx context.Context, mode chunk.ModePut, chs ...chunk.Chunk) ( if err != nil { metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) } - addrs := make([]chunk.Address, 0) - for _, c := range chs { - addrs = append(addrs, c.Address()) - } return exist, err } @@ -219,7 +215,7 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed db.pullIndex.PutInBatch(batch, item) db.pushIndex.PutInBatch(batch, item) - if db.putSetCheckFn(item.Address[:]) { + if db.putToGCCheck(item.Address[:]) { // TODO: this might result in an edge case where a node // that has very little storage and uploads using an anonymous @@ -255,7 +251,7 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I db.retrievalDataIndex.PutInBatch(batch, item) db.pullIndex.PutInBatch(batch, item) - if db.putSetCheckFn(item.Address[:]) { + if db.putToGCCheck(item.Address[:]) { // TODO: this might result in an edge case where a node // that has very little storage and uploads using an anonymous @@ -271,7 +267,7 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I } // setGC is a helper function used to add chunks to the retrieval access -// index and the gc index in the cases that the putSetCheckFn condition +// index and the gc index in the cases that the putToGCCheck condition // vets a gc set. this is to mitigate index leakage in edge cases where // a chunk is added to a node's localstore and given that the chunk is // already within that node's NN (thus, it can be added to the gc index diff --git a/storage/localstore/mode_put_test.go b/storage/localstore/mode_put_test.go index c814cba5be..de66bb3b62 100644 --- a/storage/localstore/mode_put_test.go +++ b/storage/localstore/mode_put_test.go @@ -322,7 +322,7 @@ func TestModePut_sameChunk(t *testing.T) { // in the added chunk to show up in GC index func TestModePut_addToGc(t *testing.T) { // PutSetCheckFunc will tell localstore to always Set the chunks that enter - opts := &Options{PutSetCheckFunc: func(_ []byte) bool { return true }} + opts := &Options{PutToGCCheck: func(_ []byte) bool { return true }} for _, m := range []chunk.ModePut{ chunk.ModePutSync, chunk.ModePutUpload, diff --git a/swarm.go b/swarm.go index b1b04ebb79..0f98387795 100644 --- a/swarm.go +++ b/swarm.go @@ -212,10 +212,10 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e ) localStore, err := localstore.New(config.ChunkDbPath, config.BaseKey, &localstore.Options{ - MockStore: mockStore, - Capacity: config.DbCapacity, - Tags: self.tags, - PutSetCheckFunc: to.IsAddressWithinDepth, + MockStore: mockStore, + Capacity: config.DbCapacity, + Tags: self.tags, + PutToGCCheck: to.IsWithinDepth, }) if err != nil { return nil, err From 09203461ec5a8bf09159f8e2c8201a995acd2e87 Mon Sep 17 00:00:00 2001 From: acud Date: Tue, 12 Nov 2019 15:01:13 +0530 Subject: [PATCH 4/8] localstore: fix test cases --- storage/localstore/mode_put_test.go | 30 ++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/storage/localstore/mode_put_test.go b/storage/localstore/mode_put_test.go index de66bb3b62..4bdef34a95 100644 --- a/storage/localstore/mode_put_test.go +++ b/storage/localstore/mode_put_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethersphere/swarm/chunk" + "github.com/syndtr/goleveldb/leveldb" ) // TestModePutRequest validates ModePutRequest index values on the provided DB. @@ -321,15 +322,23 @@ func TestModePut_sameChunk(t *testing.T) { // TestModePutSync_addToGc validates ModePut* with PutSetCheckFunc stub results // in the added chunk to show up in GC index func TestModePut_addToGc(t *testing.T) { - // PutSetCheckFunc will tell localstore to always Set the chunks that enter - opts := &Options{PutToGCCheck: func(_ []byte) bool { return true }} - for _, m := range []chunk.ModePut{ - chunk.ModePutSync, - chunk.ModePutUpload, - chunk.ModePutRequest, + retVal := true + // PutSetCheckFunc's output is toggled from the test case + opts := &Options{PutToGCCheck: func(_ []byte) bool { return retVal }} + + for _, m := range []struct { + mode chunk.ModePut + putToGc bool + }{ + {mode: chunk.ModePutSync, putToGc: true}, + {mode: chunk.ModePutSync, putToGc: false}, + {mode: chunk.ModePutUpload, putToGc: true}, + {mode: chunk.ModePutUpload, putToGc: false}, + {mode: chunk.ModePutRequest, putToGc: true}, // in ModePutRequest we always insert to GC, so putToGc=false not needed } { for _, tc := range multiChunkTestCases { t.Run(tc.name, func(t *testing.T) { + retVal = m.putToGc db, cleanupFunc := newTestDB(t, opts) defer cleanupFunc() @@ -341,7 +350,7 @@ func TestModePut_addToGc(t *testing.T) { chunks := generateTestRandomChunks(tc.count) - _, err := db.Put(context.Background(), m, chunks...) + _, err := db.Put(context.Background(), m.mode, chunks...) if err != nil { t.Fatal(err) } @@ -351,8 +360,11 @@ func TestModePut_addToGc(t *testing.T) { for _, ch := range chunks { po := db.po(ch.Address()) binIDs[po]++ - - newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, binIDs[po], nil)(t) + var wantErr error + if !m.putToGc { + wantErr = leveldb.ErrNotFound + } + newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, binIDs[po], wantErr)(t) } }) } From 0eb5a7232035d6a968b3005b044e3b25aab959c4 Mon Sep 17 00:00:00 2001 From: acud Date: Tue, 12 Nov 2019 17:10:26 +0530 Subject: [PATCH 5/8] localstore: wip test gc set for existing item --- storage/localstore/mode_put.go | 25 ++++++++-- storage/localstore/mode_put_test.go | 71 +++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 4 deletions(-) diff --git a/storage/localstore/mode_put.go b/storage/localstore/mode_put.go index 2dfbba8b17..7cfc94258e 100644 --- a/storage/localstore/mode_put.go +++ b/storage/localstore/mode_put.go @@ -22,6 +22,7 @@ import ( "fmt" "time" + "github.com/davecgh/go-spew/spew" "github.com/ethereum/go-ethereum/metrics" "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/shed" @@ -203,6 +204,13 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed return false, 0, err } if exists { + if db.putToGCCheck(item.Address) { + gcSizeChange, err = db.setGC(batch, item) + if err != nil { + return false, 0, err + } + } + return true, 0, nil } @@ -215,7 +223,7 @@ func (db *DB) putUpload(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed db.pullIndex.PutInBatch(batch, item) db.pushIndex.PutInBatch(batch, item) - if db.putToGCCheck(item.Address[:]) { + if db.putToGCCheck(item.Address) { // TODO: this might result in an edge case where a node // that has very little storage and uploads using an anonymous @@ -240,7 +248,14 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I return false, 0, err } if exists { - return true, 0, nil + if db.putToGCCheck(item.Address) { + gcSizeChange, err = db.setGC(batch, item) + if err != nil { + return false, 0, err + } + } + + return true, gcSizeChange, nil } item.StoreTimestamp = now() @@ -251,8 +266,7 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I db.retrievalDataIndex.PutInBatch(batch, item) db.pullIndex.PutInBatch(batch, item) - if db.putToGCCheck(item.Address[:]) { - + if db.putToGCCheck(item.Address) { // TODO: this might result in an edge case where a node // that has very little storage and uploads using an anonymous // upload will have some of the content GCd before being able @@ -279,16 +293,19 @@ func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, e item.AccessTimestamp = i.AccessTimestamp db.gcIndex.DeleteInBatch(batch, item) gcSizeChange-- + spew.Dump("setGCTimestampRemoveItemStamp", item.AccessTimestamp) case leveldb.ErrNotFound: // the chunk is not accessed before default: return 0, err } item.AccessTimestamp = now() + spew.Dump("setGCTimestamp", item.AccessTimestamp) db.retrievalAccessIndex.PutInBatch(batch, item) db.gcIndex.PutInBatch(batch, item) gcSizeChange++ + spew.Dump("batch", batch) return gcSizeChange, nil } diff --git a/storage/localstore/mode_put_test.go b/storage/localstore/mode_put_test.go index 4bdef34a95..ff82b5d546 100644 --- a/storage/localstore/mode_put_test.go +++ b/storage/localstore/mode_put_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/davecgh/go-spew/spew" "github.com/ethersphere/swarm/chunk" "github.com/syndtr/goleveldb/leveldb" ) @@ -364,6 +365,7 @@ func TestModePut_addToGc(t *testing.T) { if !m.putToGc { wantErr = leveldb.ErrNotFound } + newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp) newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, binIDs[po], wantErr)(t) } }) @@ -371,6 +373,75 @@ func TestModePut_addToGc(t *testing.T) { } } +// TestModePutSync_addToGcExisting validates ModePut* with PutSetCheckFunc stub results +// in the added chunk to show up in GC index +func TestModePut_addToGcExisting(t *testing.T) { + retVal := true + // PutSetCheckFunc's output is toggled from the test case + opts := &Options{PutToGCCheck: func(_ []byte) bool { return retVal }} + + for _, m := range []struct { + mode chunk.ModePut + putToGc bool + }{ + {mode: chunk.ModePutSync, putToGc: true}, + //{mode: chunk.ModePutSync, putToGc: false}, + //{mode: chunk.ModePutUpload, putToGc: true}, + //{mode: chunk.ModePutUpload, putToGc: false}, + //{mode: chunk.ModePutRequest, putToGc: true}, // in ModePutRequest we always insert to GC, so putToGc=false not needed + } { + for _, tc := range multiChunkTestCases[:1] { + t.Run(tc.name, func(t *testing.T) { + retVal = m.putToGc + + db, cleanupFunc := newTestDB(t, opts) + defer cleanupFunc() + + wantStoreTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return wantStoreTimestamp + })() + spew.Dump("storeStamp", wantStoreTimestamp) + + chunks := generateTestRandomChunks(tc.count) + + _, err := db.Put(context.Background(), m.mode, chunks...) + if err != nil { + t.Fatal(err) + } + + time.Sleep(1 * time.Millisecond) + // change the timestamp, put the chunks again and + // expect the access timestamp to change + wantAccessTimestamp := time.Now().UTC().UnixNano() + defer setNow(func() (t int64) { + return wantAccessTimestamp + })() + spew.Dump("updated stamp", wantAccessTimestamp) + + _, err = db.Put(context.Background(), m.mode, chunks...) + if err != nil { + t.Fatal(err) + } + + binIDs := make(map[uint8]uint64) + + for _, ch := range chunks { + po := db.po(ch.Address()) + binIDs[po]++ + var wantErr error + if !m.putToGc { + wantErr = leveldb.ErrNotFound + } + + newRetrieveIndexesTestWithAccess(db, ch, wantStoreTimestamp, wantAccessTimestamp) + newGCIndexTest(db, ch, wantStoreTimestamp, wantAccessTimestamp, binIDs[po], wantErr)(t) + } + }) + } + } +} + // TestPutDuplicateChunks validates the expected behaviour for // passing duplicate chunks to the Put method. func TestPutDuplicateChunks(t *testing.T) { From da7a16e09a788c663851dbd9ef7eb627c2f9feb9 Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Tue, 12 Nov 2019 13:39:58 +0100 Subject: [PATCH 6/8] storage/localstore: fix TestModePut_addToGcExisting --- storage/localstore/mode_put.go | 11 +++++++---- storage/localstore/mode_put_test.go | 13 +++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/storage/localstore/mode_put.go b/storage/localstore/mode_put.go index 7cfc94258e..18ff31c029 100644 --- a/storage/localstore/mode_put.go +++ b/storage/localstore/mode_put.go @@ -22,7 +22,6 @@ import ( "fmt" "time" - "github.com/davecgh/go-spew/spew" "github.com/ethereum/go-ethereum/metrics" "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/shed" @@ -287,25 +286,29 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I // already within that node's NN (thus, it can be added to the gc index // safely) func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, err error) { + if item.BinID <= 0 { + i, err := db.retrievalDataIndex.Get(item) + if err != nil { + return 0, err + } + item.BinID = i.BinID + } i, err := db.retrievalAccessIndex.Get(item) switch err { case nil: item.AccessTimestamp = i.AccessTimestamp db.gcIndex.DeleteInBatch(batch, item) gcSizeChange-- - spew.Dump("setGCTimestampRemoveItemStamp", item.AccessTimestamp) case leveldb.ErrNotFound: // the chunk is not accessed before default: return 0, err } item.AccessTimestamp = now() - spew.Dump("setGCTimestamp", item.AccessTimestamp) db.retrievalAccessIndex.PutInBatch(batch, item) db.gcIndex.PutInBatch(batch, item) gcSizeChange++ - spew.Dump("batch", batch) return gcSizeChange, nil } diff --git a/storage/localstore/mode_put_test.go b/storage/localstore/mode_put_test.go index ff82b5d546..ce2277ae4a 100644 --- a/storage/localstore/mode_put_test.go +++ b/storage/localstore/mode_put_test.go @@ -24,7 +24,6 @@ import ( "testing" "time" - "github.com/davecgh/go-spew/spew" "github.com/ethersphere/swarm/chunk" "github.com/syndtr/goleveldb/leveldb" ) @@ -385,12 +384,12 @@ func TestModePut_addToGcExisting(t *testing.T) { putToGc bool }{ {mode: chunk.ModePutSync, putToGc: true}, - //{mode: chunk.ModePutSync, putToGc: false}, - //{mode: chunk.ModePutUpload, putToGc: true}, - //{mode: chunk.ModePutUpload, putToGc: false}, - //{mode: chunk.ModePutRequest, putToGc: true}, // in ModePutRequest we always insert to GC, so putToGc=false not needed + {mode: chunk.ModePutSync, putToGc: false}, + {mode: chunk.ModePutUpload, putToGc: true}, + {mode: chunk.ModePutUpload, putToGc: false}, + {mode: chunk.ModePutRequest, putToGc: true}, // in ModePutRequest we always insert to GC, so putToGc=false not needed } { - for _, tc := range multiChunkTestCases[:1] { + for _, tc := range multiChunkTestCases { t.Run(tc.name, func(t *testing.T) { retVal = m.putToGc @@ -401,7 +400,6 @@ func TestModePut_addToGcExisting(t *testing.T) { defer setNow(func() (t int64) { return wantStoreTimestamp })() - spew.Dump("storeStamp", wantStoreTimestamp) chunks := generateTestRandomChunks(tc.count) @@ -417,7 +415,6 @@ func TestModePut_addToGcExisting(t *testing.T) { defer setNow(func() (t int64) { return wantAccessTimestamp })() - spew.Dump("updated stamp", wantAccessTimestamp) _, err = db.Put(context.Background(), m.mode, chunks...) if err != nil { From ee5eaf31e2d0f0594c23513ff278bc899f90634e Mon Sep 17 00:00:00 2001 From: acud Date: Wed, 13 Nov 2019 14:13:19 +0530 Subject: [PATCH 7/8] localstore: fix typo --- storage/localstore/mode_put.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/localstore/mode_put.go b/storage/localstore/mode_put.go index 18ff31c029..3ea7c8aa4e 100644 --- a/storage/localstore/mode_put.go +++ b/storage/localstore/mode_put.go @@ -281,7 +281,7 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I // setGC is a helper function used to add chunks to the retrieval access // index and the gc index in the cases that the putToGCCheck condition -// vets a gc set. this is to mitigate index leakage in edge cases where +// warrants a gc set. this is to mitigate index leakage in edge cases where // a chunk is added to a node's localstore and given that the chunk is // already within that node's NN (thus, it can be added to the gc index // safely) From 4d75a998626f3b52cae0ac2f77eb260b6845f5ff Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Wed, 13 Nov 2019 10:56:48 +0100 Subject: [PATCH 8/8] storage/localstore: check only for bin id 0 --- storage/localstore/mode_put.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/localstore/mode_put.go b/storage/localstore/mode_put.go index 3ea7c8aa4e..bfca64e60c 100644 --- a/storage/localstore/mode_put.go +++ b/storage/localstore/mode_put.go @@ -286,7 +286,7 @@ func (db *DB) putSync(batch *leveldb.Batch, binIDs map[uint8]uint64, item shed.I // already within that node's NN (thus, it can be added to the gc index // safely) func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, err error) { - if item.BinID <= 0 { + if item.BinID == 0 { i, err := db.retrievalDataIndex.Get(item) if err != nil { return 0, err