Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor MerkleDB.commitChanges #2921

Merged
merged 16 commits into from
Apr 8, 2024
130 changes: 52 additions & 78 deletions database/prefixdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@ import (
"sync"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/hashing"
)

const (
defaultBufCap = 256
)

var (
_ database.Database = (*Database)(nil)
_ database.Batch = (*batch)(nil)
Expand All @@ -26,9 +23,8 @@ var (
// a unique value.
type Database struct {
// All keys in this db begin with this byte slice
dbPrefix []byte
// Holds unused []byte
bufferPool sync.Pool
dbPrefix []byte
bufferPool *utils.BytesPool

// lock needs to be held during Close to guarantee db will not be set to nil
// concurrently with another operation. All other operations can hold RLock.
Expand All @@ -40,13 +36,9 @@ type Database struct {

func newDB(prefix []byte, db database.Database) *Database {
return &Database{
dbPrefix: prefix,
db: db,
bufferPool: sync.Pool{
New: func() interface{} {
return make([]byte, 0, defaultBufCap)
},
},
dbPrefix: prefix,
db: db,
bufferPool: utils.NewBytesPool(),
}
}

Expand Down Expand Up @@ -91,9 +83,6 @@ func PrefixKey(prefix, key []byte) []byte {
return prefixedKey
}

// Assumes that it is OK for the argument to db.db.Has
// to be modified after db.db.Has returns
// [key] may be modified after this method returns.
func (db *Database) Has(key []byte) (bool, error) {
db.lock.RLock()
defer db.lock.RUnlock()
Expand All @@ -102,14 +91,11 @@ func (db *Database) Has(key []byte) (bool, error) {
return false, database.ErrClosed
}
prefixedKey := db.prefix(key)
has, err := db.db.Has(prefixedKey)
db.bufferPool.Put(prefixedKey)
return has, err
defer db.bufferPool.Put(prefixedKey)

return db.db.Has(*prefixedKey)
}

// Assumes that it is OK for the argument to db.db.Get
// to be modified after db.db.Get returns.
// [key] may be modified after this method returns.
func (db *Database) Get(key []byte) ([]byte, error) {
db.lock.RLock()
defer db.lock.RUnlock()
Expand All @@ -118,15 +104,11 @@ func (db *Database) Get(key []byte) ([]byte, error) {
return nil, database.ErrClosed
}
prefixedKey := db.prefix(key)
val, err := db.db.Get(prefixedKey)
db.bufferPool.Put(prefixedKey)
return val, err
defer db.bufferPool.Put(prefixedKey)

return db.db.Get(*prefixedKey)
}

// Assumes that it is OK for the argument to db.db.Put
// to be modified after db.db.Put returns.
// [key] can be modified after this method returns.
// [value] should not be modified.
func (db *Database) Put(key, value []byte) error {
db.lock.RLock()
defer db.lock.RUnlock()
Expand All @@ -135,14 +117,11 @@ func (db *Database) Put(key, value []byte) error {
return database.ErrClosed
}
prefixedKey := db.prefix(key)
err := db.db.Put(prefixedKey, value)
db.bufferPool.Put(prefixedKey)
return err
defer db.bufferPool.Put(prefixedKey)

return db.db.Put(*prefixedKey, value)
}

// Assumes that it is OK for the argument to db.db.Delete
// to be modified after db.db.Delete returns.
// [key] may be modified after this method returns.
func (db *Database) Delete(key []byte) error {
db.lock.RLock()
defer db.lock.RUnlock()
Expand All @@ -151,9 +130,9 @@ func (db *Database) Delete(key []byte) error {
return database.ErrClosed
}
prefixedKey := db.prefix(key)
err := db.db.Delete(prefixedKey)
db.bufferPool.Put(prefixedKey)
return err
defer db.bufferPool.Put(prefixedKey)

return db.db.Delete(*prefixedKey)
}

func (db *Database) NewBatch() database.Batch {
Expand Down Expand Up @@ -186,15 +165,17 @@ func (db *Database) NewIteratorWithStartAndPrefix(start, prefix []byte) database
Err: database.ErrClosed,
}
}

prefixedStart := db.prefix(start)
defer db.bufferPool.Put(prefixedStart)

prefixedPrefix := db.prefix(prefix)
it := &iterator{
Iterator: db.db.NewIteratorWithStartAndPrefix(prefixedStart, prefixedPrefix),
defer db.bufferPool.Put(prefixedPrefix)

return &iterator{
Iterator: db.db.NewIteratorWithStartAndPrefix(*prefixedStart, *prefixedPrefix),
db: db,
}
db.bufferPool.Put(prefixedStart)
db.bufferPool.Put(prefixedPrefix)
return it
}

func (db *Database) Compact(start, limit []byte) error {
Expand All @@ -204,7 +185,14 @@ func (db *Database) Compact(start, limit []byte) error {
if db.closed {
return database.ErrClosed
}
return db.db.Compact(db.prefix(start), db.prefix(limit))

prefixedStart := db.prefix(start)
defer db.bufferPool.Put(prefixedStart)

prefixedLimit := db.prefix(limit)
defer db.bufferPool.Put(prefixedLimit)

return db.db.Compact(*prefixedStart, *prefixedLimit)
}

func (db *Database) Close() error {
Expand Down Expand Up @@ -236,23 +224,12 @@ func (db *Database) HealthCheck(ctx context.Context) (interface{}, error) {
}

// Return a copy of [key], prepended with this db's prefix.
// The returned slice should be put back in the pool
// when it's done being used.
func (db *Database) prefix(key []byte) []byte {
// Get a []byte from the pool
prefixedKey := db.bufferPool.Get().([]byte)
// The returned slice should be put back in the pool when it's done being used.
func (db *Database) prefix(key []byte) *[]byte {
keyLen := len(db.dbPrefix) + len(key)
if cap(prefixedKey) >= keyLen {
// The [] byte we got from the pool is big enough to hold the prefixed key
prefixedKey = prefixedKey[:keyLen]
} else {
// The []byte from the pool wasn't big enough.
// Put it back and allocate a new, bigger one
db.bufferPool.Put(prefixedKey)
prefixedKey = make([]byte, keyLen)
}
copy(prefixedKey, db.dbPrefix)
copy(prefixedKey[len(db.dbPrefix):], key)
prefixedKey := db.bufferPool.Get(keyLen)
copy(*prefixedKey, db.dbPrefix)
copy((*prefixedKey)[len(db.dbPrefix):], key)
return prefixedKey
}

Expand All @@ -264,33 +241,32 @@ type batch struct {
// Each key is prepended with the database's prefix.
// Each byte slice underlying a key should be returned to the pool
// when this batch is reset.
ops []database.BatchOp
ops []batchOp
}

type batchOp struct {
Key *[]byte
Value []byte
Delete bool
}

// Assumes that it is OK for the argument to b.Batch.Put
// to be modified after b.Batch.Put returns
// [key] may be modified after this method returns.
// [value] may be modified after this method returns.
func (b *batch) Put(key, value []byte) error {
prefixedKey := b.db.prefix(key)
copiedValue := slices.Clone(value)
b.ops = append(b.ops, database.BatchOp{
b.ops = append(b.ops, batchOp{
Key: prefixedKey,
Value: copiedValue,
})
return b.Batch.Put(prefixedKey, copiedValue)
return b.Batch.Put(*prefixedKey, copiedValue)
}

// Assumes that it is OK for the argument to b.Batch.Delete
// to be modified after b.Batch.Delete returns
// [key] may be modified after this method returns.
func (b *batch) Delete(key []byte) error {
prefixedKey := b.db.prefix(key)
b.ops = append(b.ops, database.BatchOp{
b.ops = append(b.ops, batchOp{
Key: prefixedKey,
Delete: true,
})
return b.Batch.Delete(prefixedKey)
return b.Batch.Delete(*prefixedKey)
}

// Write flushes any accumulated data to the memory database.
Expand All @@ -316,19 +292,17 @@ func (b *batch) Reset() {

// Clear b.writes
if cap(b.ops) > len(b.ops)*database.MaxExcessCapacityFactor {
b.ops = make([]database.BatchOp, 0, cap(b.ops)/database.CapacityReductionFactor)
b.ops = make([]batchOp, 0, cap(b.ops)/database.CapacityReductionFactor)
} else {
b.ops = b.ops[:0]
}
b.Batch.Reset()
}

// Replay replays the batch contents.
// Assumes it's safe to modify the key argument to w.Delete and w.Put
// after those methods return.
// Replay the batch contents.
func (b *batch) Replay(w database.KeyValueWriterDeleter) error {
for _, op := range b.ops {
keyWithoutPrefix := op.Key[len(b.db.dbPrefix):]
keyWithoutPrefix := (*op.Key)[len(b.db.dbPrefix):]
if op.Delete {
if err := w.Delete(keyWithoutPrefix); err != nil {
return err
Expand Down
71 changes: 70 additions & 1 deletion utils/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

package utils

import "crypto/rand"
import (
"crypto/rand"
"math/bits"
"sync"
)

// RandomBytes returns a slice of n random bytes
// Intended for use in testing
Expand All @@ -12,3 +16,68 @@ func RandomBytes(n int) []byte {
_, _ = rand.Read(b)
return b
}

// Constant taken from the "math" package
const intSize = 32 << (^uint(0) >> 63) // 32 or 64

// BytesPool tracks buckets of available buffers to be allocated. Each bucket
// allocates buffers of the following length:
//
// 0
// 1
// 3
// 7
// 15
// 31
// 63
// 127
// ...
// MaxInt
//
// In order to allocate a buffer of length 19 (for example), we calculate the
// number of bits required to represent 19 (5). And therefore allocate a slice
// from bucket 5, which has length 31. This is the bucket which produces the
// smallest slices that are at least length 19.
//
// When replacing a buffer of length 19, we calculate the number of bits
// required to represent 20 (5). And therefore place the slice into bucket 4,
// which has length 15. This is the bucket which produces the largest slices
// that a length 19 slice can be used for.
type BytesPool [intSize]sync.Pool

func NewBytesPool() *BytesPool {
var p BytesPool
for i := range p {
// uint is used here to avoid overflowing int during the shift
size := uint(1)<<i - 1
p[i] = sync.Pool{
New: func() interface{} {
// Sync pool needs to return pointer-like values to avoid memory
// allocations.
b := make([]byte, size)
return &b
},
}
}
return &p
}

// Get returns a non-nil pointer to a slice with the requested length.
//
// It is not guaranteed for the returned bytes to have been zeroed.
func (p *BytesPool) Get(length int) *[]byte {
index := bits.Len(uint(length)) // Round up
bytes := p[index].Get().(*[]byte)
*bytes = (*bytes)[:length] // Set the length to be the expected value
return bytes
}

// Put takes ownership of a non-nil pointer to a slice of bytes.
//
// Note: this function takes ownership of the underlying array. So, the length
// of the provided slice is ignored and only its capacity is used.
func (p *BytesPool) Put(bytes *[]byte) {
size := cap(*bytes)
index := bits.Len(uint(size)+1) - 1 // Round down
p[index].Put(bytes)
}