Skip to content

Commit

Permalink
Seek all levels of LSM tree for move keyspace (#580)
Browse files Browse the repository at this point in the history
In Badger move keyspace, an older version of the key can be written to the LSM tree after a newer version of the key. After some compactions, this can mean that a higher level in the tree can have an older version, while a lower level can have a newer version. To fix this, we must seek through all the levels to find the right version -- we only need to do this for the move keyspace.

Note that we used to do this exact logic on the entire keyspace before. Move keyspace was introduced to avoid that need to seek through all levels. Move keyspace is still a great thing because we only need to seek all levels for move keys -- seeks for normal keyspace can be stopped once any version of the key is found.

Fixes #578 .
  • Loading branch information
manishrjain committed Sep 23, 2018
1 parent a1528d1 commit af99e5f
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 5 deletions.
33 changes: 30 additions & 3 deletions db.go
Expand Up @@ -17,6 +17,7 @@
package badger

import (
"bytes"
"encoding/binary"
"expvar"
"log"
Expand Down Expand Up @@ -495,19 +496,45 @@ func (db *DB) getMemTables() ([]*skl.Skiplist, func()) {
// tables and find the max version among them. To maintain this invariant, we also need to ensure
// that all versions of a key are always present in the same table from level 1, because compaction
// can push any table down.
//
// Update (Sep 22, 2018): To maintain the above invariant, and to allow keys to be moved from one
// value log to another (while reclaiming space during value log GC), we have logically moved this
// need to write "old versions after new versions" to the badgerMove keyspace. Thus, for normal
// gets, we can stop going down the LSM tree once we find any version of the key (note however that
// we will ALWAYS skip versions with ts greater than the key version). However, if that key has
// been moved, then for the corresponding movekey, we'll look through all the levels of the tree
// to ensure that we pick the highest version of the movekey present.
func (db *DB) get(key []byte) (y.ValueStruct, error) {
tables, decr := db.getMemTables() // Lock should be released.
defer decr()

var maxVs *y.ValueStruct
var version uint64
if bytes.HasPrefix(key, badgerMove) {
// If we are checking badgerMove key, we should look into all the
// levels, so we can pick up the newer versions, which might have been
// compacted down the tree.
maxVs = &y.ValueStruct{}
version = y.ParseTs(key)
}

y.NumGets.Add(1)
for i := 0; i < len(tables); i++ {
vs := tables[i].Get(key)
y.NumMemtableGets.Add(1)
if vs.Meta != 0 || vs.Value != nil {
if vs.Meta == 0 && vs.Value == nil {
continue
}
// Found a version of the key. For user keyspace, return immediately. For move keyspace,
// continue iterating, unless we found a version == given key version.
if maxVs == nil || vs.Version == version {
return vs, nil
}
if maxVs.Version < vs.Version {
*maxVs = vs
}
}
return db.lc.get(key)
return db.lc.get(key, maxVs)
}

func (db *DB) updateOffset(ptrs []valuePointer) {
Expand Down Expand Up @@ -969,7 +996,7 @@ func (db *DB) RunValueLogGC(discardRatio float64) error {
// Find head on disk
headKey := y.KeyWithTs(head, math.MaxUint64)
// Need to pass with timestamp, lsm get removes the last 8 bytes and compares key
val, err := db.lc.get(headKey)
val, err := db.lc.get(headKey, nil)
if err != nil {
return errors.Wrap(err, "Retrieving head from on-disk LSM")
}
Expand Down
13 changes: 11 additions & 2 deletions levels.go
Expand Up @@ -773,12 +773,13 @@ func (s *levelsController) close() error {
}

// get returns the found value if any. If not found, we return nil.
func (s *levelsController) get(key []byte) (y.ValueStruct, error) {
func (s *levelsController) get(key []byte, maxVs *y.ValueStruct) (y.ValueStruct, error) {
// It's important that we iterate the levels from 0 on upward. The reason is, if we iterated
// in opposite order, or in parallel (naively calling all the h.RLock() in some order) we could
// read level L's tables post-compaction and level L+1's tables pre-compaction. (If we do
// parallelize this, we will need to call the h.RLock() function by increasing order of level
// number.)
version := y.ParseTs(key)
for _, h := range s.levels {
vs, err := h.get(key) // Calls h.RLock() and h.RUnlock().
if err != nil {
Expand All @@ -787,7 +788,15 @@ func (s *levelsController) get(key []byte) (y.ValueStruct, error) {
if vs.Value == nil && vs.Meta == 0 {
continue
}
return vs, nil
if maxVs == nil || vs.Version == version {
return vs, nil
}
if maxVs.Version < vs.Version {
*maxVs = vs
}
}
if maxVs != nil {
return *maxVs, nil
}
return y.ValueStruct{}, nil
}
Expand Down
72 changes: 72 additions & 0 deletions value_test.go
Expand Up @@ -17,6 +17,7 @@
package badger

import (
"bytes"
"fmt"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -677,6 +678,77 @@ func checkKeys(t *testing.T, kv *DB, keys [][]byte) {
require.Equal(t, i, len(keys))
}

// Test Bug #578, which showed that if a value is moved during value log GC, an
// older version can end up at a higher level in the LSM tree than a newer
// version, causing the data to not be returned.
func TestBug578(t *testing.T) {
dir, err := ioutil.TempDir("", "badger")
y.Check(err)
defer os.RemoveAll(dir)

opts := DefaultOptions
opts.Dir = dir
opts.ValueDir = dir
opts.ValueLogMaxEntries = 64
opts.MaxTableSize = 1 << 13

db, err := Open(opts)
require.NoError(t, err)

key := func(i int) []byte {
return []byte(fmt.Sprintf("%d%100d", i, i))
}

value := make([]byte, 100)
y.Check2(rand.Read(value))

writeRange := func(from, to int) {
for i := from; i < to; i++ {
err := db.Update(func(txn *Txn) error {
return txn.Set(key(i), value)
})
require.NoError(t, err)
}
}

readRange := func(from, to int) {
for i := from; i < to; i++ {
err := db.View(func(txn *Txn) error {
item, err := txn.Get(key(i))
if err != nil {
return err
}
val, err := item.Value()
if err != nil {
return err
}

if !bytes.Equal(val, value) {
t.Fatalf("Invalid value for key: %q", key(i))
}
return nil
})
require.NoError(t, err)
}
}

// Let's run this whole thing a few times.
for j := 0; j < 10; j++ {
t.Logf("Cycle: %d\n", j)
writeRange(0, 32)
writeRange(0, 10)
writeRange(50, 72)
writeRange(40, 72)
writeRange(40, 72)

// Run value log GC a few times.
for i := 0; i < 5; i++ {
db.RunValueLogGC(0.5)
}
readRange(0, 10)
}
}

func BenchmarkReadWrite(b *testing.B) {
rwRatio := []float32{
0.1, 0.2, 0.5, 1.0,
Expand Down

0 comments on commit af99e5f

Please sign in to comment.