Skip to content

Commit

Permalink
Merge pull request #639 from fuweid/cp-copy-key-before-seek
Browse files Browse the repository at this point in the history
[1.3] bucket.Put: copy key before seek
  • Loading branch information
ahrtr committed Dec 18, 2023
2 parents defa564 + fabe2fb commit e102fcf
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 11 deletions.
30 changes: 19 additions & 11 deletions bucket.go
Expand Up @@ -162,12 +162,17 @@ func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
return nil, ErrBucketNameRequired
}

// Insert into node.
// Tip: Use a new variable `newKey` instead of reusing the existing `key` to prevent
// it from being marked as leaking, and accordingly cannot be allocated on stack.
newKey := cloneBytes(key)

// Move cursor to correct position.
c := b.Cursor()
k, _, flags := c.seek(key)
k, _, flags := c.seek(newKey)

// Return an error if there is an existing key.
if bytes.Equal(key, k) {
if bytes.Equal(newKey, k) {
if (flags & bucketLeafFlag) != 0 {
return nil, ErrBucketExists
}
Expand All @@ -182,16 +187,14 @@ func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
}
var value = bucket.write()

// Insert into node.
key = cloneBytes(key)
c.node().put(key, key, value, 0, bucketLeafFlag)
c.node().put(newKey, newKey, value, 0, bucketLeafFlag)

// Since subbuckets are not allowed on inline buckets, we need to
// dereference the inline page, if it exists. This will cause the bucket
// to be treated as a regular, non-inline bucket for the rest of the tx.
b.page = nil

return b.Bucket(key), nil
return b.Bucket(newKey), nil
}

// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
Expand Down Expand Up @@ -288,18 +291,23 @@ func (b *Bucket) Put(key []byte, value []byte) error {
return ErrValueTooLarge
}

// Insert into node.
// Tip: Use a new variable `newKey` instead of reusing the existing `key` to prevent
// it from being marked as leaking, and accordingly cannot be allocated on stack.
newKey := cloneBytes(key)

// Move cursor to correct position.
c := b.Cursor()
k, _, flags := c.seek(key)
k, _, flags := c.seek(newKey)

// Return an error if there is an existing key with a bucket value.
if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 {
if bytes.Equal(newKey, k) && (flags&bucketLeafFlag) != 0 {
return ErrIncompatibleValue
}

// Insert into node.
key = cloneBytes(key)
c.node().put(key, key, value, 0, 0)
// gofail: var beforeBucketPut struct{}

c.node().put(newKey, newKey, value, 0, 0)

return nil
}
Expand Down
115 changes: 115 additions & 0 deletions tests/failpoint/db_failpoint_test.go
Expand Up @@ -94,3 +94,118 @@ func TestFailpoint_mLockFail_When_remap(t *testing.T) {

require.NoError(t, err)
}

// TestIssue72 reproduces issue 72.
//
// When bbolt is processing a `Put` invocation, the key might be concurrently
// updated by the application which calls the `Put` API (although it shouldn't).
// It might lead to a situation that bbolt use an old key to find a proper
// position to insert the key/value pair, but actually inserts a new key.
// Eventually it might break the rule that all keys should be sorted. In a
// worse case, it might cause page elements to point to already freed pages.
//
// REF: https://github.com/etcd-io/bbolt/issues/72
func TestIssue72(t *testing.T) {
db := btesting.MustCreateDBWithOption(t, &bolt.Options{PageSize: 4096})

bucketName := []byte(t.Name())
err := db.Update(func(tx *bolt.Tx) error {
_, txerr := tx.CreateBucket(bucketName)
return txerr
})
require.NoError(t, err)

// The layout is like:
//
// +--+--+--+
// +------+1 |3 |10+---+
// | +-++--+--+ |
// | | |
// | | |
// +v-+--+ +v-+--+ +-v+--+--+
// |1 |2 | |3 |4 | |10|11|12|
// +--+--+ +--+--+ +--+--+--+
//
err = db.Update(func(tx *bolt.Tx) error {
bk := tx.Bucket(bucketName)

for _, id := range []int{1, 2, 3, 4, 10, 11, 12} {
if txerr := bk.Put(idToBytes(id), make([]byte, 1000)); txerr != nil {
return txerr
}
}
return nil
})
require.NoError(t, err)

require.NoError(t, gofail.Enable("beforeBucketPut", `sleep(5000)`))

// +--+--+--+
// +------+1 |3 |1 +---+
// | +-++--+--+ |
// | | |
// | | |
// +v-+--+ +v-+--+ +-v+--+--+--+
// |1 |2 | |3 |4 | |1 |10|11|12|
// +--+--+ +--+--+ +--+--+--+--+
//
key := idToBytes(13)
updatedKey := idToBytes(1)
err = db.Update(func(tx *bolt.Tx) error {
bk := tx.Bucket(bucketName)

go func() {
time.Sleep(3 * time.Second)
copy(key, updatedKey)
}()
return bk.Put(key, make([]byte, 100))
})
require.NoError(t, err)

require.NoError(t, gofail.Disable("beforeBucketPut"))

// bbolt inserts 100 into last branch page. Since there are two `1`
// keys in branch, spill operation will update first `1` pointer and
// then last one won't be updated and continues to point to freed page.
//
//
// +--+--+--+
// +---------------+1 |3 |1 +---------+
// | +--++-+--+ |
// | | |
// | | |
// | +--+--+ +v-+--+ +-----v-----+
// | |1 |2 | |3 |4 | |freed page |
// | +--+--+ +--+--+ +-----------+
// |
// +v-+--+--+--+---+
// |1 |10|11|12|100|
// +--+--+--+--+---+
err = db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(bucketName).Put(idToBytes(100), make([]byte, 100))
})
require.NoError(t, err)

defer func() {
if r := recover(); r != nil {
t.Logf("panic info:\n %v", r)
}
}()

// Add more keys to ensure branch node to spill.
err = db.Update(func(tx *bolt.Tx) error {
bk := tx.Bucket(bucketName)

for _, id := range []int{101, 102, 103, 104, 105} {
if txerr := bk.Put(idToBytes(id), make([]byte, 1000)); txerr != nil {
return txerr
}
}
return nil
})
require.NoError(t, err)
}

func idToBytes(id int) []byte {
return []byte(fmt.Sprintf("%010d", id))
}

0 comments on commit e102fcf

Please sign in to comment.