Skip to content

Commit b9aae1b

Browse files
committed
Make KVItem.Value() method asynchronous, and return error.
KVItem.Value() now takes a func that is called with the slice containing the bytes for the value. This slice directly references the memory map for the value log. It allows us to avoid copying bytes from the mmap-ed value log. Other related changes: * Add a field to KVItem to hold a reference to the corresponding KV. * Add a field to KVItem to track whether the value has been prefetched. * Add a field to KVItem to record any errors during prefetching. * Change the semantics of iteration. We don’t fetch values by design anymore. So FetchValues does not apply normally. It only applies in the case of prefetching. So renamed it to PrefetchValues. PrefetchSize now only applies if PrefetchValues is true. * If PrefetchValues is true, we copy values from mmap and set item.val. * Remove KV.FillValue() method. We don’t fetch value synchronously anymore, this method is not relevant. * Refactor some private methods in value.go * Fixes to tests. Added a helper method to asynchronously retrieve the value and assign it to a variable.
1 parent b87ae27 commit b9aae1b

File tree

7 files changed

+257
-178
lines changed

7 files changed

+257
-178
lines changed

doc_test.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,38 @@ func Example() {
4141
var item badger.KVItem
4242
if err := kv.Get(key, &item); err != nil {
4343
fmt.Printf("Error while getting key: %q", key)
44+
return
4445
}
45-
fmt.Printf("GET %s %s\n", key, item.Value())
46+
var val []byte
47+
err := item.Value(func(v []byte) {
48+
val = make([]byte, len(v))
49+
copy(val, v)
50+
})
51+
if err != nil {
52+
fmt.Printf("Error while getting value for key: %q", key)
53+
return
54+
}
55+
56+
fmt.Printf("GET %s %s\n", key, val)
4657

4758
if err := kv.CompareAndSet(key, []byte("venus"), 100); err != nil {
4859
fmt.Println("CAS counter mismatch")
4960
} else {
5061
if err = kv.Get(key, &item); err != nil {
5162
fmt.Printf("Error while getting key: %q", key)
5263
}
53-
fmt.Printf("Set to %s\n", item.Value())
64+
65+
err := item.Value(func(v []byte) {
66+
val = make([]byte, len(v))
67+
copy(val, v)
68+
})
69+
70+
if err != nil {
71+
fmt.Printf("Error while getting value for key: %q", key)
72+
return
73+
}
74+
75+
fmt.Printf("Set to %s\n", val)
5476
}
5577
if err := kv.CompareAndSet(key, []byte("mars"), item.Counter()); err == nil {
5678
fmt.Println("Set to mars")

iterator.go

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,20 @@ import (
2323
"github.com/dgraph-io/badger/y"
2424
)
2525

26+
type prefetchStatus uint8
27+
28+
const (
29+
empty prefetchStatus = iota
30+
prefetched
31+
)
32+
2633
// KVItem is returned during iteration. Both the Key() and Value() output is only valid until
2734
// iterator.Next() is called.
2835
type KVItem struct {
36+
status prefetchStatus
37+
err error
2938
wg sync.WaitGroup
39+
kv *KV
3040
key []byte
3141
vptr []byte
3242
meta byte
@@ -42,12 +52,21 @@ func (item *KVItem) Key() []byte {
4252
return item.key
4353
}
4454

45-
// Value returns the value, generally fetched from the value log. This call can block while the
46-
// value is populated asynchronously via a disk read. Remember to parse or copy it if you need to
47-
// reuse it. DO NOT modify or append to this slice; it would result in internal data overwrite.
48-
func (item *KVItem) Value() []byte {
55+
// Value retrieves the value of the item from the value log. It calls the
56+
// consumer function with a slice argument representing the value. In case
57+
// of error, the consumer function is not called
58+
//
59+
// Remember to parse or copy it if you need to reuse it. DO NOT modify or
60+
// append to this slice; it would result in a panic.
61+
func (item *KVItem) Value(consumer func([]byte)) error {
4962
item.wg.Wait()
50-
return item.val
63+
if item.status == prefetched {
64+
if item.err != nil {
65+
return item.err
66+
}
67+
consumer(item.val)
68+
}
69+
return item.kv.yieldItemValue(item, consumer)
5170
}
5271

5372
func (item *KVItem) hasValue() bool {
@@ -62,9 +81,25 @@ func (item *KVItem) hasValue() bool {
6281
return true
6382
}
6483

65-
// EstimatedSize returns approximate size of the key-value pair. This can be called with
66-
// FetchValues=false, to quickly iterate through and estimate the size of a range of key-value
67-
// pairs (without fetching the corresponding values).
84+
func (item *KVItem) prefetchValue() {
85+
item.err = item.kv.yieldItemValue(item, func(val []byte) {
86+
if val == nil {
87+
return
88+
}
89+
buf := item.slice.Resize(len(val))
90+
// FIXME in case of non-mmaped read buf and val might be the same location, in
91+
// which case this is redundant. Not sure if this is a no-op in that case.
92+
copy(buf, val)
93+
item.val = buf
94+
item.status = prefetched
95+
})
96+
}
97+
98+
// EstimatedSize returns approximate size of the key-value pair.
99+
//
100+
// This can be called while iterating through a store to quickly estimate the
101+
// size of a range of key-value pairs (without fetching the corresponding
102+
// values).
68103
func (item *KVItem) EstimatedSize() int64 {
69104
if !item.hasValue() {
70105
return 0
@@ -121,16 +156,18 @@ func (l *list) pop() *KVItem {
121156

122157
// IteratorOptions is used to set options when iterating over Badger key-value stores.
123158
type IteratorOptions struct {
124-
PrefetchSize int // How many KV pairs to prefetch while iterating.
125-
FetchValues bool // Controls whether the values should be fetched from the value log.
159+
// Indicates whether we should prefetch values during iteration and store them.
160+
PrefetchValues bool
161+
// How many KV pairs to prefetch while iterating. Valid only if PrefetchValues is true.
162+
PrefetchSize int
126163
Reverse bool // Direction of iteration. False is forward, true is backward.
127164
}
128165

129166
// DefaultIteratorOptions contains default options when iterating over Badger key-value stores.
130167
var DefaultIteratorOptions = IteratorOptions{
131-
PrefetchSize: 100,
132-
FetchValues: true,
133-
Reverse: false,
168+
PrefetchValues: false,
169+
PrefetchSize: 100,
170+
Reverse: false,
134171
}
135172

136173
// Iterator helps iterating over the KV pairs in a lexicographically sorted order.
@@ -147,7 +184,7 @@ type Iterator struct {
147184
func (it *Iterator) newItem() *KVItem {
148185
item := it.waste.pop()
149186
if item == nil {
150-
item = &KVItem{slice: new(y.Slice)}
187+
item = &KVItem{slice: new(y.Slice), kv: it.kv}
151188
}
152189
return item
153190
}
@@ -206,20 +243,20 @@ func (it *Iterator) fill(item *KVItem) {
206243
item.key = y.Safecopy(item.key, it.iitr.Key())
207244
item.vptr = y.Safecopy(item.vptr, vs.Value)
208245
item.val = nil
209-
if it.opt.FetchValues {
246+
if it.opt.PrefetchValues {
210247
item.wg.Add(1)
211248
go func() {
212-
it.kv.fillItem(item)
249+
// FIXME we are not handling errors here.
250+
item.prefetchValue()
213251
item.wg.Done()
214252
}()
215253
}
216254
}
217255

218256
func (it *Iterator) prefetch() {
219-
prefetchSize := it.opt.PrefetchSize
220-
if it.opt.PrefetchSize <= 1 {
221-
// Try prefetching atleast the first two items to put into it.item and it.data.
222-
prefetchSize = 2
257+
prefetchSize := 2
258+
if it.opt.PrefetchValues && it.opt.PrefetchSize > 1 {
259+
prefetchSize = it.opt.PrefetchSize
223260
}
224261

225262
i := it.iitr

kv.go

Lines changed: 20 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,16 @@ func NewKV(optParam *Options) (out *KV, err error) {
180180
if err := out.Get(head, &item); err != nil {
181181
return nil, errors.Wrap(err, "Retrieving head")
182182
}
183-
val := item.Value()
183+
184+
var val []byte
185+
err = item.Value(func(v []byte) {
186+
val = make([]byte, len(v))
187+
copy(val, v)
188+
})
189+
190+
if err != nil {
191+
return nil, errors.Wrap(err, "Retrieving head value")
192+
}
184193
// lastUsedCasCounter will either be the value stored in !badger!head, or some subsequently
185194
// written value log entry that we replay. (Subsequent value log entries might be _less_
186195
// than lastUsedCasCounter, if there was value log gc so we have to max() values while
@@ -389,53 +398,29 @@ func (s *KV) getMemTables() ([]*skl.Skiplist, func()) {
389398
}
390399
}
391400

392-
// FillValue populates item with a value.
393-
//
394-
// item must be a valid KVItem returned by Badger during iteration. This method
395-
// could be used to fetch values explicitly during a key-only iteration
396-
// (FetchValues is set to false). It is useful for example, if values are
397-
// required for some keys only.
398-
//
399-
// This method should not be called when iteration is performed with
400-
// FetchValues set to true, as it will cause additional copying.
401-
//
402-
// Multiple calls to this method will result in multiple copies from the value
403-
// log. It is the caller’s responsibility to make sure they don’t call this
404-
// method more than once.
405-
func (s *KV) FillValue(item *KVItem) error {
406-
// Wait for any pending fill operations to finish.
407-
item.wg.Wait()
408-
item.wg.Add(1)
409-
defer item.wg.Done()
410-
return s.fillItem(item)
411-
}
412-
413-
func (s *KV) fillItem(item *KVItem) error {
401+
func (s *KV) yieldItemValue(item *KVItem, consumer func([]byte)) error {
414402
if !item.hasValue() {
415-
item.val = nil
403+
consumer(nil)
416404
return nil
417405
}
418406

419407
if item.slice == nil {
420408
item.slice = new(y.Slice)
421409
}
410+
422411
if (item.meta & BitValuePointer) == 0 {
423-
item.val = item.slice.Resize(len(item.vptr))
424-
copy(item.val, item.vptr)
412+
val := item.slice.Resize(len(item.vptr))
413+
copy(val, item.vptr)
414+
consumer(val)
425415
return nil
426416
}
427417

428418
var vp valuePointer
429419
vp.Decode(item.vptr)
430-
entry, err := s.vlog.Read(vp, item.slice)
420+
err := s.vlog.Read(vp, item.slice, consumer)
431421
if err != nil {
432-
return errors.Wrapf(err, "Unable to read from value log: %+v", vp)
433-
}
434-
if (entry.Meta & BitDelete) != 0 { // Is a tombstone.
435-
item.val = nil
436-
return nil
422+
return err
437423
}
438-
item.val = entry.Value
439424
return nil
440425
}
441426

@@ -463,18 +448,14 @@ func (s *KV) Get(key []byte, item *KVItem) error {
463448
if err != nil {
464449
return errors.Wrapf(err, "KV::Get key: %q", key)
465450
}
466-
if item.slice == nil {
467-
item.slice = new(y.Slice)
468-
}
451+
469452
item.meta = vs.Meta
470453
item.userMeta = vs.UserMeta
471454
item.casCounter = vs.CASCounter
472455
item.key = key
456+
item.kv = s
473457
item.vptr = vs.Value
474458

475-
if err := s.fillItem(item); err != nil {
476-
return errors.Wrapf(err, "KV::Get key: %q", key)
477-
}
478459
return nil
479460
}
480461

0 commit comments

Comments
 (0)