Skip to content

Commit

Permalink
Many GC related changes (#479)
Browse files Browse the repository at this point in the history
Max entries per value log file:
To ensure that we can GC a file in bounded time, we should restrict the max number of key-value pairs that can be contained by one log file. A new option now enforces that, taking the minimum of value log file size, and the number of entries contained to determine when a file should be rotated.

New move keyspace:
We should never write an entry with an older timestamp for the same key. We need to maintain this invariant to search for the latest value of a key, or else we need to search in all tables and find the max version among them.

- Value log GC moves key-value pairs to new files. This used to cause older versions of keys to be written to the top of LSM tree. This change puts those moves into a separate keyspace, prefixed by badgerMove. When doing a value read, we can look into this keyspace if the previous value pointer is no longer valid (pointed to a deleted value log file).
- Add an integration test, which runs for 3 mins and ensures that GC moves work as expected.
- Switch db.get back to stopping as soon as we encounter the first key (instead of looking into all levels).
- Collect value log GC stats when doing compactions.
- Trace log the entire value log GC, so we know how the file was picked and the result of sampling.
- Pick two value log files for GC. First from discard stats, and second randomly. Then try to GC both, and return on whichever is successful.
- Add a function to delete move keys corresponding to a value log file deletion.
- Increase the value threshold to 32 by default.
  • Loading branch information
manishrjain committed May 8, 2018
1 parent 1640484 commit 7af0076
Show file tree
Hide file tree
Showing 9 changed files with 464 additions and 117 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,10 @@ observe the full IOPS throughput provided by modern SSDs. In Dgraph, we have set
it to 128. For more details, [see this
thread](https://groups.google.com/d/topic/golang-nuts/jPb_h3TvlKE/discussion).

- Are there any linux specific settings that I should use?

We recommend setting max file descriptors to a high number depending upon the expected size of you data.

## Contact
- Please use [discuss.dgraph.io](https://discuss.dgraph.io) for questions, feature requests and discussions.
- Please use [Github issue tracker](https://github.com/dgraph-io/badger/issues) for filing bugs or feature requests.
Expand Down
28 changes: 10 additions & 18 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
badgerPrefix = []byte("!badger!") // Prefix for internal keys used by badger.
head = []byte("!badger!head") // For storing value offset for replay.
txnKey = []byte("!badger!txn") // For indicating end of entries in txn.
badgerMove = []byte("!badger!move") // For key-value pairs which got moved during GC.
)

type closers struct {
Expand Down Expand Up @@ -485,33 +486,25 @@ func (db *DB) getMemTables() ([]*skl.Skiplist, func()) {

// get returns the value in memtable or disk for given key.
// Note that value will include meta byte.
//
// IMPORTANT: We should never write an entry with an older timestamp for the same key, We need to
// maintain this invariant to search for the latest value of a key, or else we need to search in all
// tables and find the max version among them. To maintain this invariant, we also need to ensure
// that all versions of a key are always present in the same table from level 1, because compaction
// can push any table down.
func (db *DB) get(key []byte) (y.ValueStruct, error) {
tables, decr := db.getMemTables() // Lock should be released.
defer decr()

y.NumGets.Add(1)
version := y.ParseTs(key)
var maxVs y.ValueStruct
// Need to search for values in all tables, with managed db
// latest value needn't be present in the latest table.
// Even without managed db, purging can cause this constraint
// to be violated.
// Search until required version is found or iterate over all
// tables and return max version.
for i := 0; i < len(tables); i++ {
vs := tables[i].Get(key)
y.NumMemtableGets.Add(1)
if vs.Meta == 0 && vs.Value == nil {
continue
}
if vs.Version == version {
if vs.Meta != 0 || vs.Value != nil {
return vs, nil
}
if maxVs.Version < vs.Version {
maxVs = vs
}
}
return db.lc.get(key, maxVs)
return db.lc.get(key)
}

func (db *DB) updateOffset(ptrs []valuePointer) {
Expand Down Expand Up @@ -967,8 +960,7 @@ func (db *DB) RunValueLogGC(discardRatio float64) error {
// Find head on disk
headKey := y.KeyWithTs(head, math.MaxUint64)
// Need to pass with timestamp, lsm get removes the last 8 bytes and compares key
var maxVs y.ValueStruct
val, err := db.lc.get(headKey, maxVs)
val, err := db.lc.get(headKey)
if err != nil {
return errors.Wrap(err, "Retrieving head from on-disk LSM")
}
Expand Down
1 change: 1 addition & 0 deletions integration/testgc/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/testgc
221 changes: 221 additions & 0 deletions integration/testgc/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package main

import (
"encoding/binary"
"fmt"
"log"
"math/rand"
"net/http"
_ "net/http/pprof"
"os"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/y"
)

var Max int64 = 10000000
var suffix = make([]byte, 128)

type S struct {
sync.Mutex
vals map[uint64]uint64

count uint64 // Not under mutex lock.
}

func encoded(i uint64) []byte {
out := make([]byte, 8)
binary.BigEndian.PutUint64(out, i)
return out
}

func (s *S) write(db *badger.DB) error {
return db.Update(func(txn *badger.Txn) error {
for i := 0; i < 10; i++ {
// These keys would be overwritten.
keyi := uint64(rand.Int63n(Max))
key := encoded(keyi)
vali := atomic.AddUint64(&s.count, 1)
val := encoded(vali)
val = append(val, suffix...)
if err := txn.Set(key, val); err != nil {
return err
}
}
for i := 0; i < 20; i++ {
// These keys would be new and never overwritten.
keyi := atomic.AddUint64(&s.count, 1)
if keyi%1000000 == 0 {
log.Printf("Count: %d\n", keyi)
}
key := encoded(keyi)
val := append(key, suffix...)
if err := txn.Set(key, val); err != nil {
return err
}
}
return nil
})
}

func (s *S) read(db *badger.DB) error {
max := int64(atomic.LoadUint64(&s.count))
keyi := uint64(rand.Int63n(max))
key := encoded(keyi)

err := db.View(func(txn *badger.Txn) error {
item, err := txn.Get(key)
if err != nil {
return err
}
val, err := item.Value()
if err != nil {
return err
}
y.AssertTruef(len(val) == len(suffix)+8, "Found val of len: %d\n", len(val))
vali := binary.BigEndian.Uint64(val[0:8])
s.Lock()
expected := s.vals[keyi]
if vali < expected {
log.Fatalf("Expected: %d. Found: %d. Key: %d\n", expected, vali, keyi)
} else if vali == expected {
// pass
} else {
s.vals[keyi] = vali
}
s.Unlock()
return nil
})
if err == badger.ErrKeyNotFound {
return nil
}
return err
}

func main() {
fmt.Println("Badger Integration test for value log GC.")

// dir, err := ioutil.TempDir("./", "badger")
// if err != nil {
// log.Fatal(err)
// }
dir := "/mnt/drive/badgertest"
os.RemoveAll(dir)
// defer os.RemoveAll(dir)

opts := badger.DefaultOptions
opts.Dir = dir
opts.ValueDir = dir
opts.TableLoadingMode = options.MemoryMap
opts.ValueLogLoadingMode = options.FileIO
// opts.ValueLogFileSize = 64 << 20 // 64 MB.
opts.SyncWrites = false

db, err := badger.Open(opts)
if err != nil {
log.Fatal(err)
}
defer db.Close()

go http.ListenAndServe("localhost:8080", nil)

closer := y.NewCloser(11)
go func() {
// Run value log GC.
defer closer.Done()
var count int
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
select {
case <-closer.HasBeenClosed():
log.Printf("Num times value log GC was successful: %d\n", count)
return
default:
}
log.Printf("Starting a value log GC")
err := db.RunValueLogGC(0.1)
log.Printf("Result of value log GC: %v\n", err)
if err == nil {
count++
}
}
}()

s := S{
count: uint64(Max),
vals: make(map[uint64]uint64),
}
var numLoops uint64
ticker := time.NewTicker(5 * time.Second)
for i := 0; i < 10; i++ {
go func() {
defer closer.Done()
for {
if err := s.write(db); err != nil {
log.Fatal(err)
}
for j := 0; j < 10; j++ {
if err := s.read(db); err != nil {
log.Fatal(err)
}
}
nl := atomic.AddUint64(&numLoops, 1)
select {
case <-closer.HasBeenClosed():
return
case <-ticker.C:
log.Printf("Num loops: %d\n", nl)
default:
}
}
}()
}
time.Sleep(5 * time.Minute)
log.Println("Signaling...")
closer.SignalAndWait()
log.Println("Wait done. Now iterating over everything.")

err = db.View(func(txn *badger.Txn) error {
iopts := badger.DefaultIteratorOptions
itr := txn.NewIterator(iopts)
defer itr.Close()

var total, tested int
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
key := item.Key()
keyi := binary.BigEndian.Uint64(key)
total++

val, err := item.Value()
if err != nil {
return err
}
if len(val) < 8 {
log.Printf("Unexpected value: %x\n", val)
continue
}
vali := binary.BigEndian.Uint64(val[0:8])

expected, ok := s.vals[keyi] // Not all keys must be in vals map.
if ok {
tested++
if vali < expected {
// vali must be equal or greater than what's in the map.
log.Fatalf("Expected: %d. Got: %d. Key: %d\n", expected, vali, keyi)
}
}
}
log.Printf("Total iterated: %d. Tested values: %d\n", total, tested)
return nil
})
if err != nil {
log.Fatalf("Error while iterating: %v", err)
}
log.Println("Iteration done. Test successful.")
}
25 changes: 23 additions & 2 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,26 @@ func (item *Item) yieldItemValue() ([]byte, func(), error) {

var vp valuePointer
vp.Decode(item.vptr)
return item.db.vlog.Read(vp, item.slice)
result, cb, err := item.db.vlog.Read(vp, item.slice)
if err != ErrRetry {
return result, cb, err
}

// The value pointer is pointing to a deleted value log. Look for the move key and read that
// instead.
runCallback(cb)
key := y.KeyWithTs(item.Key(), item.Version())
moveKey := append(badgerMove, key...)
vs, err := item.db.get(moveKey)
if err != nil {
return nil, nil, err
}
if vs.Version != item.Version() {
return nil, nil, nil
}
item.vptr = vs.Value
item.meta |= vs.Meta // This meta would only be about value pointer.
return item.yieldItemValue()
}

func runCallback(cb func()) {
Expand Down Expand Up @@ -257,6 +276,8 @@ type IteratorOptions struct {
PrefetchSize int
Reverse bool // Direction of iteration. False is forward, true is backward.
AllVersions bool // Fetch all valid versions of the same key.

internalAccess bool // Used to allow internal access to badger keys.
}

// DefaultIteratorOptions contains default options when iterating over Badger key-value stores.
Expand Down Expand Up @@ -402,7 +423,7 @@ func (it *Iterator) parseItem() bool {
}

// Skip badger keys.
if bytes.HasPrefix(key, badgerPrefix) {
if !it.opt.internalAccess && bytes.HasPrefix(key, badgerPrefix) {
mi.Next()
return false
}
Expand Down
Loading

0 comments on commit 7af0076

Please sign in to comment.