Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/storage/localstore: new localstore package #19015

Merged
merged 81 commits into from Feb 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
d8acb12
swarm/storage/localstore: most basic database
janos Nov 29, 2018
9ec535a
swarm/storage/localstore: fix typos and comments
janos Dec 3, 2018
37205de
swarm/shed: add uint64 field Dec and DecInBatch methods
janos Dec 3, 2018
b1ded5a
swarm/storage/localstore: decrement size counter on ModeRemoval update
janos Dec 3, 2018
572f3cb
swarm/storage/localstore: unexport modeAccess and modeRemoval
janos Dec 3, 2018
cbb510b
swarm/storage/localstore: add WithRetrievalCompositeIndex
janos Dec 3, 2018
c7beb22
swarm/storage/localstore: add TestModeSyncing
janos Dec 3, 2018
391faa7
swarm/storage/localstore: fix test name
janos Dec 3, 2018
58f3f86
swarm/storage/localstore: add TestModeUpload
janos Dec 3, 2018
2d928bf
swarm/storage/localstore: add TestModeRequest
janos Dec 3, 2018
4d58a6f
swarm/storage/localstore: add TestModeSynced
janos Dec 4, 2018
af1b137
swarm/storage/localstore: add TestModeAccess
janos Dec 4, 2018
96409ff
swarm/storage/localstore: add TestModeRemoval
janos Dec 4, 2018
b782bfe
swarm/storage/localstore: add mock store option for chunk data
janos Dec 5, 2018
35376d8
swarm/storage/localstore: add TestDB_pullIndex
janos Dec 5, 2018
e6a7196
swarm/storage/localstore: add TestDB_gcIndex
janos Dec 6, 2018
58c7f11
swarm/storage/localstore: change how batches are written
janos Dec 6, 2018
6e8b2ad
swarm/storage/localstore: add updateOnAccess function
janos Dec 6, 2018
7b8510e
swarm/storage/localhost: add DB.gcSize
janos Dec 6, 2018
cf3ec30
swarm/storage/localstore: update comments
janos Dec 7, 2018
d58e1ee
swarm/storage/localstore: add BenchmarkNew
janos Dec 7, 2018
f2299f4
swarm/storage/localstore: add retrieval tests benchmarks
janos Dec 7, 2018
e6bdda7
swarm/storage/localstore: accessors redesign
janos Dec 13, 2018
b6f5b7a
swarm/storage/localstore: add semaphore for updateGC goroutine
janos Dec 13, 2018
5732c38
swarm/storage/localstore: implement basic garbage collection
janos Dec 13, 2018
ac9d153
swarm/storage/localstore: optimize collectGarbage
janos Dec 14, 2018
e6e29f5
swarm/storage/localstore: add more garbage collection tests cases
janos Dec 14, 2018
750268d
swarm/shed, swarm/storage/localstore: rename IndexItem to Item
janos Dec 17, 2018
a486a09
swarm/shed: add Index.CountFrom
janos Dec 18, 2018
e17fec2
swarm/storage/localstore: persist gcSize
janos Dec 18, 2018
5605323
swarm/storage/localstore: remove composite retrieval index
janos Dec 18, 2018
5ebbcb5
swarm/shed: IterateWithPrefix and IterateWithPrefixFrom Index functions
janos Dec 19, 2018
80c269b
swarm/storage/localstore: writeGCSize function with leveldb batch
janos Dec 19, 2018
bca85ef
swarm/storage/localstore: unexport modeSetRemove
janos Dec 19, 2018
cee4004
swarm/storage/localstore: update writeGCSizeWorker comment
janos Dec 19, 2018
b5a0fd2
swarm/storage/localstore: add triggerGarbageCollection function
janos Dec 19, 2018
4fca004
swarm/storage/localstore: call writeGCSize on DB Close
janos Dec 19, 2018
5ad0c8d
swarm/storage/localstore: additional comment in writeGCSizeWorker
janos Dec 19, 2018
cb8078e
Merge branch 'master' into localstore
janos Dec 19, 2018
c9f5130
swarm/storage/localstore: add MetricsPrefix option
janos Dec 19, 2018
da9bab4
swarm/storage/localstore: fix a typo
janos Dec 19, 2018
d54d7ae
swamr/shed: only one Index Iterate function
janos Dec 19, 2018
67473be
swarm/storage/localstore: use shed Iterate function
janos Dec 19, 2018
2299147
swarm/shed: pass a new byte slice copy to index decode functions
janos Dec 20, 2018
5488a2b
swarm/storage/localstore: implement feed subscriptions
janos Dec 20, 2018
38bdf7f
swarm/storage/localstore: add more subscriptions tests
janos Dec 21, 2018
a74eae2
swarm/storage/localsore: add parallel upload test
janos Dec 21, 2018
534009f
swarm/storage/localstore: use storage.MaxPO in subscription tests
janos Dec 21, 2018
5edd22d
swarm/storage/localstore: subscription of addresses instead chunks
janos Dec 21, 2018
95726d7
swarm/storage/localstore: lock item address in collectGarbage iterator
janos Dec 21, 2018
c5dcae3
swarm/storage/localstore: fix TestSubscribePull to include MaxPO
janos Dec 22, 2018
f3380ea
swarm/storage/localstore: improve subscriptions
janos Jan 7, 2019
1d2d470
swarm/storage/localstore: add TestDB_SubscribePull_sinceAndUntil test
janos Jan 7, 2019
c26c979
swarm/storage/localstore: adjust pull sync tests
janos Jan 7, 2019
c188281
Merge branch 'master' into localstore
janos Jan 7, 2019
c5e4c61
swarm/storage/localstore: remove writeGCSizeDelay and use literal
janos Jan 8, 2019
015d977
swarm/storage/localstore: adjust subscriptions tests delays and comments
janos Jan 8, 2019
abbb4a6
swarm/storage/localstore: add godoc package overview
janos Jan 10, 2019
fb0a822
swarm/storage/localstore: fix a typo
janos Jan 10, 2019
c423060
swarm/storage/localstore: update package overview
janos Jan 10, 2019
c5a6456
swarm/storage/localstore: remove repeated index change
janos Jan 11, 2019
33726a4
swarm/storage/localstore: rename ChunkInfo to ChunkDescriptor
janos Jan 11, 2019
25a068a
swarm/storage/localstore: add comment in collectGarbageWorker
janos Jan 11, 2019
ca1e24f
swarm/storage/localstore: replace atomics with mutexes for gcSize and…
janos Jan 11, 2019
c62fee3
Merge branch 'master' into localstore
janos Jan 14, 2019
5ddc75f
swarm/storage/localstore: protect addrs map in pull subs tests
janos Jan 14, 2019
87bbd61
swarm/storage/localstore: protect slices in push subs test
janos Jan 14, 2019
6ad67d7
swarm/storage/localstore: protect chunks in TestModePutUpload_parallel
janos Jan 14, 2019
a550388
swarm/storage/localstore: fix a race in TestDB_updateGCSem defers
janos Jan 14, 2019
1dae999
swarm/storage/localstore: remove parallel flag from tests
janos Jan 14, 2019
eda338a
swarm/storage/localstore: fix a race in testDB_collectGarbageWorker
janos Jan 14, 2019
ad5b329
swarm/storage/localstore: remove unused code
janos Jan 22, 2019
8d15e82
swarm/storage/localstore: add more context to pull sub log messages
janos Jan 25, 2019
3948044
swarm/storage/localstore: merge branch 'master' into localstore
janos Jan 25, 2019
6c8208a
swarm/storage/localstore: BenchmarkPutUpload and global lock option
janos Jan 26, 2019
6accc6b
swarm/storage/localstore: pre-generate chunks in BenchmarkPutUpload
janos Jan 28, 2019
85cd349
swarm/storage/localstore: correct useGlobalLock in collectGarbage
janos Jan 28, 2019
7fa1ba9
swarm/storage/localstore: fix typos and update comments
janos Jan 29, 2019
ebecd05
swarm/storage/localstore: update writeGCSize comment
janos Jan 29, 2019
f056e86
swarm/storage/localstore: remove global lock option
janos Feb 4, 2019
40432d9
swarm/storage/localstore: add description for gc size counting
janos Feb 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 56 additions & 0 deletions swarm/storage/localstore/doc.go
@@ -0,0 +1,56 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

/*
Package localstore provides disk storage layer for Swarm Chunk persistence.
It uses swarm/shed abstractions on top of github.com/syndtr/goleveldb LevelDB
implementation.

The main type is DB which manages the storage by providing methods to
access and add Chunks and to manage their status.

Modes are abstractions that do specific changes to Chunks. There are three
mode types:

- ModeGet, for Chunk access
- ModePut, for adding Chunks to the database
- ModeSet, for changing Chunk statuses

Every mode type has a corresponding type (Getter, Putter and Setter)
that provides adequate method to perform the opperation and that type
should be injected into localstore consumers instead the whole DB.
This provides more clear insight which operations consumer is performing
on the database.

Getters, Putters and Setters accept different get, put and set modes
to perform different actions. For example, ModeGet has two different
variables ModeGetRequest and ModeGetSync and two different Getters
can be constructed with them that are used when the chunk is requested
or when the chunk is synced as this two events are differently changing
the database.

Subscription methods are implemented for a specific purpose of
continuous iterations over Chunks that should be provided to
Push and Pull syncing.

DB implements an internal garbage collector that removes only synced
Chunks from the database based on their most recent access time.

Internally, DB stores Chunk data and any required information, such as
store and access timestamps in different shed indexes that can be
iterated on by garbage collector or subscriptions.
*/
package localstore
302 changes: 302 additions & 0 deletions swarm/storage/localstore/gc.go
@@ -0,0 +1,302 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

/*
Counting number of items in garbage collection index

The number of items in garbage collection index is not the same as the number of
chunks in retrieval index (total number of stored chunks). Chunk can be garbage
collected only when it is set to a synced state by ModSetSync, and only then can
be counted into garbage collection size, which determines whether a number of
chunk should be removed from the storage by the garbage collection. This opens a
possibility that the storage size exceeds the limit if files are locally
uploaded and the node is not connected to other nodes or there is a problem with
syncing.

Tracking of garbage collection size (gcSize) is focused on performance. Key
points:

1. counting the number of key/value pairs in LevelDB takes around 0.7s for 1e6
on a very fast ssd (unacceptable long time in reality)
2. locking leveldb batch writes with a global mutex (serial batch writes) is
not acceptable, we should use locking per chunk address

Because of point 1. we cannot count the number of items in garbage collection
index in New constructor as it could last very long for realistic scenarios
where limit is 5e6 and nodes are running on slower hdd disks or cloud providers
with low IOPS.

Point 2. is a performance optimization to allow parallel batch writes with
getters, putters and setters. Every single batch that they create contain only
information related to a single chunk, no relations with other chunks or shared
statistical data (like gcSize). This approach avoids race conditions on writing
batches in parallel, but creates a problem of synchronizing statistical data
values like gcSize. With global mutex lock, any data could be written by any
batch, but would not use utilize the full potential of leveldb parallel writes.

To mitigate this two problems, the implementation of counting and persisting
gcSize is split into two parts. One is the in-memory value (gcSize) that is fast
to read and write with a dedicated mutex (gcSizeMu) if the batch which adds or
removes items from garbage collection index is successful. The second part is
the reliable persistence of this value to leveldb database, as storedGCSize
field. This database field is saved by writeGCSizeWorker and writeGCSize
functions when in-memory gcSize variable is changed, but no too often to avoid
very frequent database writes. This database writes are triggered by
writeGCSizeTrigger when a call is made to function incGCSize. Trigger ensures
that no database writes are done only when gcSize is changed (contrary to a
simpler periodic writes or checks). A backoff of 10s in writeGCSizeWorker
ensures that no frequent batch writes are made. Saving the storedGCSize on
database Close function ensures that in-memory gcSize is persisted when database
is closed.

This persistence must be resilient to failures like panics. For this purpose, a
collection of hashes that are added to the garbage collection index, but still
not persisted to storedGCSize, must be tracked to count them in when DB is
constructed again with New function after the failure (swarm node restarts). On
every batch write that adds a new item to garbage collection index, the same
hash is added to gcUncountedHashesIndex. This ensures that there is a persisted
information which hashes were added to the garbage collection index. But, when
the storedGCSize is saved by writeGCSize function, this values are removed in
the same batch in which storedGCSize is changed to ensure consistency. When the
panic happen, or database Close method is not saved. The database storage
contains all information to reliably and efficiently get the correct number of
items in garbage collection index. This is performed in the New function when
all hashes in gcUncountedHashesIndex are counted, added to the storedGCSize and
saved to the disk before the database is constructed again. Index
gcUncountedHashesIndex is acting as dirty bit for recovery that provides
information what needs to be corrected. With a simple dirty bit, the whole
garbage collection index should me counted on recovery instead only the items in
gcUncountedHashesIndex. Because of the triggering mechanizm of writeGCSizeWorker
and relatively short backoff time, the number of hashes in
gcUncountedHashesIndex should be low and it should take a very short time to
recover from the previous failure. If there was no failure and
gcUncountedHashesIndex is empty, which is the usual case, New function will take
the minimal time to return.
*/

package localstore

import (
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)

var (
// gcTargetRatio defines the target number of items
// in garbage collection index that will not be removed
// on garbage collection. The target number of items
// is calculated by gcTarget function. This value must be
// in range (0,1]. For example, with 0.9 value,
// garbage collection will leave 90% of defined capacity
// in database after its run. This prevents frequent
// garbage collection runs.
gcTargetRatio = 0.9
// gcBatchSize limits the number of chunks in a single
// leveldb batch on garbage collection.
gcBatchSize int64 = 1000
)

// collectGarbageWorker is a long running function that waits for
// collectGarbageTrigger channel to signal a garbage collection
// run. GC run iterates on gcIndex and removes older items
// form retrieval and other indexes.
func (db *DB) collectGarbageWorker() {
for {
select {
case <-db.collectGarbageTrigger:
// run a single collect garbage run and
// if done is false, gcBatchSize is reached and
// another collect garbage run is needed
collectedCount, done, err := db.collectGarbage()
if err != nil {
log.Error("localstore collect garbage", "err", err)
}
// check if another gc run is needed
if !done {
db.triggerGarbageCollection()
}

if testHookCollectGarbage != nil {
testHookCollectGarbage(collectedCount)
}
case <-db.close:
return
}
}
}

// collectGarbage removes chunks from retrieval and other
// indexes if maximal number of chunks in database is reached.
// This function returns the number of removed chunks. If done
// is false, another call to this function is needed to collect
// the rest of the garbage as the batch size limit is reached.
// This function is called in collectGarbageWorker.
func (db *DB) collectGarbage() (collectedCount int64, done bool, err error) {
batch := new(leveldb.Batch)
target := db.gcTarget()

done = true
err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
// protect parallel updates
unlock, err := db.lockAddr(item.Address)
if err != nil {
return false, err
}
defer unlock()

gcSize := db.getGCSize()
if gcSize-collectedCount <= target {
return true, nil
}
// delete from retrieve, pull, gc
db.retrievalDataIndex.DeleteInBatch(batch, item)
db.retrievalAccessIndex.DeleteInBatch(batch, item)
db.pullIndex.DeleteInBatch(batch, item)
db.gcIndex.DeleteInBatch(batch, item)
collectedCount++
if collectedCount >= gcBatchSize {
// bach size limit reached,
// another gc run is needed
done = false
return true, nil
}
return false, nil
}, nil)
if err != nil {
return 0, false, err
}

err = db.shed.WriteBatch(batch)
if err != nil {
return 0, false, err
}
// batch is written, decrement gcSize
db.incGCSize(-collectedCount)
return collectedCount, done, nil
}

// gcTrigger retruns the absolute value for garbage collection
// target value, calculated from db.capacity and gcTargetRatio.
func (db *DB) gcTarget() (target int64) {
return int64(float64(db.capacity) * gcTargetRatio)
}

// incGCSize increments gcSize by the provided number.
// If count is negative, it will decrement gcSize.
func (db *DB) incGCSize(count int64) {
if count == 0 {
return
}

db.gcSizeMu.Lock()
new := db.gcSize + count
db.gcSize = new
db.gcSizeMu.Unlock()

select {
case db.writeGCSizeTrigger <- struct{}{}:
default:
}
if new >= db.capacity {
db.triggerGarbageCollection()
}
}

// getGCSize returns gcSize value by locking it
// with gcSizeMu mutex.
func (db *DB) getGCSize() (count int64) {
db.gcSizeMu.RLock()
count = db.gcSize
db.gcSizeMu.RUnlock()
return count
}

// triggerGarbageCollection signals collectGarbageWorker
// to call collectGarbage.
func (db *DB) triggerGarbageCollection() {
select {
case db.collectGarbageTrigger <- struct{}{}:
case <-db.close:
default:
}
}

// writeGCSizeWorker writes gcSize on trigger event
// and waits writeGCSizeDelay after each write.
// It implements a linear backoff with delay of
// writeGCSizeDelay duration to avoid very frequent
// database operations.
func (db *DB) writeGCSizeWorker() {
for {
select {
case <-db.writeGCSizeTrigger:
err := db.writeGCSize(db.getGCSize())
if err != nil {
log.Error("localstore write gc size", "err", err)
}
// Wait some time before writing gc size in the next
// iteration. This prevents frequent I/O operations.
select {
case <-time.After(10 * time.Second):
case <-db.close:
return
}
case <-db.close:
return
}
}
}

// writeGCSize stores the number of items in gcIndex.
// It removes all hashes from gcUncountedHashesIndex
// not to include them on the next DB initialization
// (New function) when gcSize is counted.
func (db *DB) writeGCSize(gcSize int64) (err error) {
const maxBatchSize = 1000

batch := new(leveldb.Batch)
db.storedGCSize.PutInBatch(batch, uint64(gcSize))
batchSize := 1

// use only one iterator as it acquires its snapshot
// not to remove hashes from index that are added
// after stored gc size is written
err = db.gcUncountedHashesIndex.Iterate(func(item shed.Item) (stop bool, err error) {
db.gcUncountedHashesIndex.DeleteInBatch(batch, item)
batchSize++
if batchSize >= maxBatchSize {
err = db.shed.WriteBatch(batch)
if err != nil {
return false, err
}
batch.Reset()
batchSize = 0
}
return false, nil
}, nil)
if err != nil {
return err
}
return db.shed.WriteBatch(batch)
}

// testHookCollectGarbage is a hook that can provide
// information when a garbage collection run is done
// and how many items it removed.
var testHookCollectGarbage func(collectedCount int64)