Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove memory allocations from merkledb iteration #2925

Merged
merged 24 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
88 changes: 55 additions & 33 deletions x/merkledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,10 +928,10 @@ func (db *merkleDB) commitBatch(ops []database.BatchOp) error {
return view.commitToDB(context.Background())
}

// commitChanges commits the changes in [trieToCommit] to [db].
// commitView commits the changes in [trieToCommit] to [db].
// Assumes [trieToCommit]'s node IDs have been calculated.
// Assumes [db.commitLock] is held.
func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error {
func (db *merkleDB) commitView(ctx context.Context, trieToCommit *view) error {
db.lock.Lock()
defer db.lock.Unlock()

Expand All @@ -949,7 +949,7 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error
}

changes := trieToCommit.changes
_, span := db.infoTracer.Start(ctx, "MerkleDB.commitChanges", oteltrace.WithAttributes(
_, span := db.infoTracer.Start(ctx, "MerkleDB.commitView", oteltrace.WithAttributes(
attribute.Int("nodesChanged", len(changes.nodes)),
attribute.Int("valuesChanged", len(changes.values)),
))
Expand All @@ -965,8 +965,46 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error
return nil
}

currentValueNodeBatch := db.valueNodeDB.NewBatch()
_, nodesSpan := db.infoTracer.Start(ctx, "MerkleDB.commitChanges.writeNodes")
valueNodeBatch := db.baseDB.NewBatch()
if err := db.applyChanges(ctx, valueNodeBatch, changes); err != nil {
return err
}

if err := db.commitValueChanges(ctx, valueNodeBatch); err != nil {
return err
}

db.history.record(changes)

// Update root in database.
db.root = changes.rootChange.after
db.rootID = changes.rootID
return nil
}

// moveChildViewsToDB removes any child views from the trieToCommit and moves
// them to the db.
//
// assumes [db.lock] is held
func (db *merkleDB) moveChildViewsToDB(trieToCommit *view) {
trieToCommit.validityTrackingLock.Lock()
defer trieToCommit.validityTrackingLock.Unlock()

for _, childView := range trieToCommit.childViews {
childView.updateParent(db)
db.childViews = append(db.childViews, childView)
}
trieToCommit.childViews = make([]*view, 0, defaultPreallocationSize)
}

// applyChanges takes the [changes] and applies them to [db.intermediateNodeDB]
// and [valueNodeBatch].
//
// assumes [db.lock] is held
func (db *merkleDB) applyChanges(ctx context.Context, valueNodeBatch database.KeyValueWriterDeleter, changes *changeSummary) error {
_, span := db.infoTracer.Start(ctx, "MerkleDB.applyChanges")
defer span.End()

for key, nodeChange := range changes.nodes {
shouldAddIntermediate := nodeChange.after != nil && !nodeChange.after.hasValue()
shouldDeleteIntermediate := !shouldAddIntermediate && nodeChange.before != nil && !nodeChange.before.hasValue()
Expand All @@ -976,50 +1014,34 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error

if shouldAddIntermediate {
if err := db.intermediateNodeDB.Put(key, nodeChange.after); err != nil {
nodesSpan.End()
return err
}
} else if shouldDeleteIntermediate {
if err := db.intermediateNodeDB.Delete(key); err != nil {
nodesSpan.End()
return err
}
}

if shouldAddValue {
currentValueNodeBatch.Put(key, nodeChange.after)
if err := db.valueNodeDB.Write(valueNodeBatch, key, nodeChange.after); err != nil {
return err
}
} else if shouldDeleteValue {
currentValueNodeBatch.Delete(key)
if err := db.valueNodeDB.Write(valueNodeBatch, key, nil); err != nil {
return err
}
}
}
nodesSpan.End()

_, commitSpan := db.infoTracer.Start(ctx, "MerkleDB.commitChanges.valueNodeDBCommit")
err := currentValueNodeBatch.Write()
commitSpan.End()
if err != nil {
return err
}

db.history.record(changes)

// Update root in database.
db.root = changes.rootChange.after
db.rootID = changes.rootID
return nil
}

// moveChildViewsToDB removes any child views from the trieToCommit and moves them to the db
// assumes [db.lock] is held
func (db *merkleDB) moveChildViewsToDB(trieToCommit *view) {
trieToCommit.validityTrackingLock.Lock()
defer trieToCommit.validityTrackingLock.Unlock()
// commitValueChanges is a thin wrapper around [valueNodeBatch.Write()] to
// provide tracing.
func (db *merkleDB) commitValueChanges(ctx context.Context, valueNodeBatch database.Batch) error {
_, span := db.infoTracer.Start(ctx, "MerkleDB.commitValueChanges")
defer span.End()

for _, childView := range trieToCommit.childViews {
childView.updateParent(db)
db.childViews = append(db.childViews, childView)
}
trieToCommit.childViews = make([]*view, 0, defaultPreallocationSize)
return valueNodeBatch.Write()
}

// CommitToDB is a no-op for db since it is already in sync with itself.
Expand Down
72 changes: 71 additions & 1 deletion x/merkledb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package merkledb
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math/rand"
"slices"
Expand Down Expand Up @@ -39,7 +40,7 @@ func newDB(ctx context.Context, db database.Database, config Config) (*merkleDB,

func newDefaultConfig() Config {
return Config{
IntermediateWriteBatchSize: 10,
IntermediateWriteBatchSize: 256 * units.KiB,
HistoryLength: defaultHistoryLength,
ValueNodeCacheSize: units.MiB,
IntermediateNodeCacheSize: units.MiB,
Expand Down Expand Up @@ -1330,3 +1331,72 @@ func TestCrashRecovery(t *testing.T) {
require.NoError(err)
require.Equal(expectedRoot, rootAfterRecovery)
}

func BenchmarkCommitView(b *testing.B) {
db, err := getBasicDB()
require.NoError(b, err)

ops := make([]database.BatchOp, 1_000)
for i := range ops {
k := binary.AppendUvarint(nil, uint64(i))
ops[i] = database.BatchOp{
Key: k,
Value: hashing.ComputeHash256(k),
}
}

ctx := context.Background()
viewIntf, err := db.NewView(ctx, ViewChanges{BatchOps: ops})
require.NoError(b, err)

view := viewIntf.(*view)
require.NoError(b, view.applyValueChanges(ctx))

b.Run("apply and commit changes", func(b *testing.B) {
require := require.New(b)

for i := 0; i < b.N; i++ {
db.baseDB = memdb.New() // Keep each iteration independent

valueNodeBatch := db.baseDB.NewBatch()
require.NoError(db.applyChanges(ctx, valueNodeBatch, view.changes))
require.NoError(db.commitValueChanges(ctx, valueNodeBatch))
}
})
}

func BenchmarkIteration(b *testing.B) {
db, err := getBasicDB()
require.NoError(b, err)

ops := make([]database.BatchOp, 1_000)
for i := range ops {
k := binary.AppendUvarint(nil, uint64(i))
ops[i] = database.BatchOp{
Key: k,
Value: hashing.ComputeHash256(k),
}
}

ctx := context.Background()
view, err := db.NewView(ctx, ViewChanges{BatchOps: ops})
require.NoError(b, err)

require.NoError(b, view.CommitToDB(ctx))

b.Run("create iterator", func(b *testing.B) {
for i := 0; i < b.N; i++ {
it := db.NewIterator()
it.Release()
}
})

b.Run("iterate", func(b *testing.B) {
for i := 0; i < b.N; i++ {
it := db.NewIterator()
for it.Next() {
}
it.Release()
}
})
}
98 changes: 38 additions & 60 deletions x/merkledb/value_node_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@
package merkledb

import (
"errors"

"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/utils"
)

const defaultBatchOpsLength = 256
var (
_ database.Iterator = (*iterator)(nil)

var _ database.Iterator = (*iterator)(nil)
errNodeMissingValue = errors.New("valueNodeDB contains node without a value")
)

type valueNodeDB struct {
bufferPool *utils.BytesPool
Expand Down Expand Up @@ -42,6 +46,18 @@ func newValueNodeDB(
}
}

func (db *valueNodeDB) Write(batch database.KeyValueWriterDeleter, key Key, n *node) error {
db.metrics.DatabaseNodeWrite()
db.nodeCache.Put(key, n)
prefixedKey := addPrefixToKey(db.bufferPool, valueNodePrefix, key.Bytes())
defer db.bufferPool.Put(prefixedKey)

if n == nil {
return batch.Delete(*prefixedKey)
}
return batch.Put(*prefixedKey, n.bytes())
}

func (db *valueNodeDB) newIteratorWithStartAndPrefix(start, prefix []byte) database.Iterator {
prefixedStart := addPrefixToKey(db.bufferPool, valueNodePrefix, start)
defer db.bufferPool.Put(prefixedStart)
Expand All @@ -59,13 +75,6 @@ func (db *valueNodeDB) Close() {
db.closed.Set(true)
}

func (db *valueNodeDB) NewBatch() *valueNodeBatch {
return &valueNodeBatch{
db: db,
ops: make(map[Key]*node, defaultBatchOpsLength),
}
}

func (db *valueNodeDB) Get(key Key) (*node, error) {
if cachedValue, isCached := db.nodeCache.Get(key); isCached {
db.metrics.ValueNodeCacheHit()
Expand Down Expand Up @@ -93,47 +102,11 @@ func (db *valueNodeDB) Clear() error {
return database.AtomicClearPrefix(db.baseDB, db.baseDB, valueNodePrefix)
}

// Batch of database operations
type valueNodeBatch struct {
db *valueNodeDB
ops map[Key]*node
}

func (b *valueNodeBatch) Put(key Key, value *node) {
b.ops[key] = value
}

func (b *valueNodeBatch) Delete(key Key) {
b.ops[key] = nil
}

// Write flushes any accumulated data to the underlying database.
func (b *valueNodeBatch) Write() error {
dbBatch := b.db.baseDB.NewBatch()
for key, n := range b.ops {
b.db.metrics.DatabaseNodeWrite()
b.db.nodeCache.Put(key, n)
prefixedKey := addPrefixToKey(b.db.bufferPool, valueNodePrefix, key.Bytes())

var err error
if n == nil {
err = dbBatch.Delete(*prefixedKey)
} else {
err = dbBatch.Put(*prefixedKey, n.bytes())
}
b.db.bufferPool.Put(prefixedKey)
if err != nil {
return err
}
}

return dbBatch.Write()
}

type iterator struct {
db *valueNodeDB
nodeIter database.Iterator
current *node
key []byte
value []byte
err error
}

Expand All @@ -148,21 +121,16 @@ func (i *iterator) Error() error {
}

func (i *iterator) Key() []byte {
if i.current == nil {
return nil
}
return i.current.key.Bytes()
return i.key
}

func (i *iterator) Value() []byte {
if i.current == nil {
return nil
}
return i.current.value.Value()
return i.value
}

func (i *iterator) Next() bool {
i.current = nil
i.key = nil
i.value = nil
if i.Error() != nil || i.db.closed.Get() {
return false
}
Expand All @@ -171,15 +139,25 @@ func (i *iterator) Next() bool {
}

i.db.metrics.DatabaseNodeRead()
key := i.nodeIter.Key()
key = key[valueNodePrefixLen:]
n, err := parseNode(ToKey(key), i.nodeIter.Value())

r := codecReader{
b: i.nodeIter.Value(),
// We are discarding the other bytes from the node, so we avoid copying
// the value here.
copy: false,
}
maybeValue, err := r.MaybeBytes()
if err != nil {
i.err = err
return false
}
if maybeValue.IsNothing() {
i.err = errNodeMissingValue
return false
}

i.current = n
i.key = i.nodeIter.Key()[valueNodePrefixLen:]
i.value = maybeValue.Value()
return true
}

Expand Down