Skip to content

Commit 51b1e1b

Browse files
committed
internal/compact: preserve blob references in Iter
Adapt the compaction iterator to preserve blob file references by cloning the LazyValues. Compactions that do not rewrite blob files will avoid ever retrieving the corresponding values. Additionally, fix LazyValue.Clone to copy the BlobFileNum of the LazyFetcher. Informs #112.
1 parent b3f3a50 commit 51b1e1b

File tree

6 files changed

+146
-17
lines changed

6 files changed

+146
-17
lines changed

internal/base/lazy_value.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func (lv *LazyValue) fetchValue(
216216
if !f.fetched {
217217
f.fetched = true
218218
f.value, f.callerOwned, f.err = f.Fetcher.Fetch(ctx,
219-
lv.ValueOrHandle, lv.Fetcher.BlobFileNum, lv.Fetcher.Attribute.ValueLen, buf)
219+
lv.ValueOrHandle, f.BlobFileNum, f.Attribute.ValueLen, buf)
220220
}
221221
return f.value, f.callerOwned, f.err
222222
}
@@ -262,8 +262,9 @@ func (lv *LazyValue) Clone(buf []byte, fetcher *LazyFetcher) (LazyValue, []byte)
262262
var lvCopy LazyValue
263263
if lv.Fetcher != nil {
264264
*fetcher = LazyFetcher{
265-
Fetcher: lv.Fetcher.Fetcher,
266-
Attribute: lv.Fetcher.Attribute,
265+
Fetcher: lv.Fetcher.Fetcher,
266+
Attribute: lv.Fetcher.Attribute,
267+
BlobFileNum: lv.Fetcher.BlobFileNum,
267268
// Not copying anything that has been extracted.
268269
}
269270
lvCopy.Fetcher = fetcher

internal/base/lazy_value_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,11 @@ func TestLazyValue(t *testing.T) {
5858
numCalls++
5959
require.Equal(t, []byte("foo-handle"), handle)
6060
require.Equal(t, uint32(3), valLen)
61+
require.Equal(t, DiskFileNum(90), blobFileNum)
6162
return fooBytes1, callerOwned, nil
6263
}),
63-
Attribute: AttributeAndLen{ValueLen: 3, ShortAttribute: 7},
64+
Attribute: AttributeAndLen{ValueLen: 3, ShortAttribute: 7},
65+
BlobFileNum: 90,
6466
},
6567
}
6668
require.Equal(t, []byte("foo"), getValue(fooLV3, callerOwned))

internal/base/value.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ func MakeInPlaceValue(val []byte) InternalValue {
2727
return InternalValue{lazyValue: LazyValue{ValueOrHandle: val}}
2828
}
2929

30+
// IsBlobValueHandle returns true iff the value is a blob value handle, pointing
31+
// to a value stored externally in a blob file.
32+
func (v *InternalValue) IsBlobValueHandle() bool {
33+
f := v.lazyValue.Fetcher
34+
return f != nil && f.BlobFileNum > 0
35+
}
36+
3037
// IsInPlaceValue returns true iff the value was stored in-place and does not
3138
// need to be fetched externally.
3239
func (v *InternalValue) IsInPlaceValue() bool {

internal/compact/iterator.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,9 @@ type Iter struct {
192192
// Temporary buffer used for storing the previous value, which may be an
193193
// unsafe, i.iter-owned slice that could be altered when the iterator is
194194
// advanced.
195-
valueBuf []byte
195+
valueBuf []byte
196+
// valueFetcher is used by saveValue when Cloning InternalValues.
197+
valueFetcher base.LazyFetcher
196198
iterKV *base.InternalKV
197199
iterStripeChange stripeChangeType
198200
// skip indicates whether the remaining entries in the current snapshot
@@ -878,6 +880,9 @@ func (i *Iter) mergeNext(valueMerger base.ValueMerger) {
878880
// value and return. We change the kind of the resulting key to a
879881
// Set so that it shadows keys in lower levels. That is:
880882
// MERGE + (SET*) -> SET.
883+
//
884+
// Because we must merge the value, we must retrieve it regardless
885+
// of whether the value is a blob reference.
881886
var v []byte
882887
var callerOwned bool
883888
v, callerOwned, i.err = i.iterKV.Value(i.valueBuf[:0])
@@ -1289,11 +1294,20 @@ func (i *Iter) saveKey() {
12891294
//
12901295
// If the value is in-place, this copies it into i.valueBuf. If the value is in
12911296
// a value block, it retrieves the value from the block (possibly storing the
1292-
// result into i.valueBuf).
1297+
// result into i.valueBuf). If the value is stored in an external blob file, the
1298+
// value is cloned (InternalValue.Clone) without retrieving it from the external
1299+
// file.
1300+
//
1301+
// Note that because saveValue uses i.valueBuf and i.valueFetcher to avoid
1302+
// allocations, values saved by saveValue are only valid until the next call to
1303+
// saveValue.
12931304
func (i *Iter) saveValue() {
1294-
// TODO(jackson): With the introduction of values stored in separate
1295-
// physical blob files, this should begin to Clone LazyValues that are
1296-
// stored in blob reference files when non-rewriting blob files.
1305+
// Clone blob value handles to defer the retrieval of the value.
1306+
if i.iterKV.V.IsBlobValueHandle() {
1307+
i.kv.V, i.valueBuf = i.iterKV.V.Clone(i.valueBuf, &i.valueFetcher)
1308+
return
1309+
}
1310+
12971311
v, callerOwned, err := i.iterKV.Value(i.valueBuf[:0])
12981312
if err != nil {
12991313
i.err = err

internal/compact/iterator_test.go

Lines changed: 72 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,21 @@ package compact
66

77
import (
88
"bytes"
9+
"context"
910
"encoding/binary"
1011
"fmt"
1112
"io"
1213
"slices"
1314
"strconv"
1415
"strings"
1516
"testing"
17+
"unicode"
1618

1719
"github.com/cockroachdb/datadriven"
1820
"github.com/cockroachdb/pebble/internal/base"
1921
"github.com/cockroachdb/pebble/internal/keyspan"
2022
"github.com/cockroachdb/pebble/internal/rangekey"
23+
"github.com/cockroachdb/pebble/sstable/valblk"
2124
"github.com/stretchr/testify/require"
2225
)
2326

@@ -53,7 +56,6 @@ func TestCompactionIter(t *testing.T) {
5356
var snapshots Snapshots
5457
var elideTombstones bool
5558
var allowZeroSeqnum bool
56-
5759
var ineffectualSingleDeleteKeys []string
5860
var invariantViolationSingleDeleteKeys []string
5961
resetSingleDelStats := func() {
@@ -133,16 +135,18 @@ func TestCompactionIter(t *testing.T) {
133135
continue
134136
}
135137

136-
var value []byte
138+
var iv base.InternalValue
137139
if strings.HasPrefix(key[j+1:], "varint(") {
138140
valueStr := strings.TrimSuffix(strings.TrimPrefix(key[j+1:], "varint("), ")")
139141
v, err := strconv.ParseUint(valueStr, 10, 64)
140142
require.NoError(t, err)
141-
value = binary.AppendUvarint([]byte(nil), v)
143+
iv = base.MakeInPlaceValue(binary.AppendUvarint([]byte(nil), v))
144+
} else if strings.HasPrefix(key[j+1:], "blobref(") {
145+
iv = decodeBlobReference(t, key[j+1:])
142146
} else {
143-
value = []byte(key[j+1:])
147+
iv = base.MakeInPlaceValue([]byte(key[j+1:]))
144148
}
145-
kvs = append(kvs, base.MakeInternalKV(ik, value))
149+
kvs = append(kvs, base.InternalKV{K: ik, V: iv})
146150
}
147151
rangeDelFragmenter.Finish()
148152
return ""
@@ -202,9 +206,15 @@ func TestCompactionIter(t *testing.T) {
202206
}
203207
var value []byte
204208
if kv != nil {
205-
var err error
206-
value, _, err = kv.Value(nil)
207-
require.NoError(t, err)
209+
if kv.V.IsBlobValueHandle() {
210+
lv := kv.V.LazyValue()
211+
value = []byte(fmt.Sprintf("<blobref(%s, encodedHandle=%x, valLen=%d)>",
212+
lv.Fetcher.BlobFileNum, lv.ValueOrHandle, lv.Fetcher.Attribute.ValueLen))
213+
} else {
214+
var err error
215+
value, _, err = kv.Value(nil)
216+
require.NoError(t, err)
217+
}
208218
}
209219

210220
if kv != nil {
@@ -267,6 +277,60 @@ func TestCompactionIter(t *testing.T) {
267277
runTest(t, "testdata/iter_delete_sized")
268278
}
269279

280+
// mockBlobValueFetcher is a dummy ValueFetcher implementation which produces
281+
// string values referencing the blobref.
282+
type mockBlobValueFetcher struct{}
283+
284+
var _ base.ValueFetcher = mockBlobValueFetcher{}
285+
286+
func (mockBlobValueFetcher) Fetch(
287+
ctx context.Context, handle []byte, fileNum base.DiskFileNum, valLen uint32, buf []byte,
288+
) (val []byte, callerOwned bool, err error) {
289+
s := fmt.Sprintf("<fetched value from blobref(%s, encodedHandle=%x, valLen=%d)>",
290+
fileNum, handle, valLen)
291+
return []byte(s), false, nil
292+
}
293+
294+
// decodeBlobReference decodes a blob reference from a debug string. It expects
295+
// a value of the form: blobref(<filenum>, blk<blocknum>, <offset>, <valLen>).
296+
// For example: blobref(000124, blk255, 10, 9235)
297+
func decodeBlobReference(t testing.TB, ref string) base.InternalValue {
298+
fields := strings.FieldsFunc(strings.TrimSuffix(strings.TrimPrefix(ref, "blobref("), ")"),
299+
func(r rune) bool { return r == ',' || unicode.IsSpace(r) })
300+
require.Equal(t, 4, len(fields))
301+
fileNum, err := strconv.ParseUint(fields[0], 10, 64)
302+
require.NoError(t, err)
303+
blockNum, err := strconv.ParseUint(strings.TrimPrefix(fields[1], "blk"), 10, 32)
304+
require.NoError(t, err)
305+
off, err := strconv.ParseUint(fields[2], 10, 32)
306+
require.NoError(t, err)
307+
valLen, err := strconv.ParseUint(fields[3], 10, 32)
308+
require.NoError(t, err)
309+
310+
// TODO(jackson): Support short (and long, when introduced) attributes.
311+
return base.MakeLazyValue(base.LazyValue{
312+
ValueOrHandle: encodeRemainingHandle(uint32(blockNum), uint32(off)),
313+
Fetcher: &base.LazyFetcher{
314+
Fetcher: mockBlobValueFetcher{},
315+
BlobFileNum: base.DiskFileNum(fileNum),
316+
Attribute: base.AttributeAndLen{
317+
ValueLen: uint32(valLen),
318+
},
319+
},
320+
})
321+
}
322+
323+
func encodeRemainingHandle(blockNum uint32, offsetInBlock uint32) []byte {
324+
// TODO(jackson): Pull this into a common helper.
325+
dst := make([]byte, valblk.HandleMaxLen)
326+
n := valblk.EncodeHandle(dst, valblk.Handle{
327+
ValueLen: 0,
328+
BlockNum: blockNum,
329+
OffsetInBlock: offsetInBlock,
330+
})
331+
return dst[1:n]
332+
}
333+
270334
// makeInputIters creates the iterators necessthat can be used to create a compaction
271335
// Iter.
272336
func makeInputIters(

internal/compact/testdata/iter

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,3 +1124,44 @@ next
11241124
----
11251125
a#3,SET:val
11261126
.
1127+
1128+
# Define an input sequence of simple blob references, all pointing into the same
1129+
# blob file block.
1130+
1131+
define
1132+
a.SET.9:blobref(000294, blk2, 10, 20)
1133+
b.SET.3:blobref(000294, blk2, 30, 5)
1134+
c.SETWITHDEL.8:blobref(000294, blk2, 35, 100)
1135+
d.SET.2:blobref(000294, blk2, 135, 4)
1136+
----
1137+
1138+
# An iterator should preserve blob references.
1139+
1140+
iter
1141+
first
1142+
next
1143+
next
1144+
next
1145+
----
1146+
a#9,SET:<blobref(000294, encodedHandle=020a, valLen=20)>
1147+
b#3,SET:<blobref(000294, encodedHandle=021e, valLen=5)>
1148+
c#8,SETWITHDEL:<blobref(000294, encodedHandle=0223, valLen=100)>
1149+
d#2,SET:<blobref(000294, encodedHandle=028701, valLen=4)>
1150+
1151+
# The iterator may need to fetch a blob value if it's an operand to the merge
1152+
# operator.
1153+
1154+
define
1155+
a.SET.3:blobref(000294, blk2, 10, 20)
1156+
b.MERGE.9:mergekeyvalue
1157+
b.SETWITHDEL.8:blobref(000294, blk2, 35, 100)
1158+
----
1159+
1160+
iter
1161+
first
1162+
next
1163+
next
1164+
----
1165+
a#3,SET:<blobref(000294, encodedHandle=020a, valLen=20)>
1166+
b#9,SET:<fetched value from blobref(000294, encodedHandle=0223, valLen=100)>mergekeyvalue[base]
1167+
.

0 commit comments

Comments
 (0)