Skip to content

Commit 4e2b8b2

Browse files
committed
sstable: add AddWithBlobHandle
Add a new AddWithBlobHandle method to the sstable.RawWriter interface allowing the writing of key-value pairs with values that encode the location of the value out-of-band in an external blob file. Informs #112.
1 parent 422c212 commit 4e2b8b2

File tree

16 files changed

+581
-53
lines changed

16 files changed

+581
-53
lines changed

sstable/blob/blob.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
1717
"github.com/cockroachdb/pebble/sstable/block"
1818
"github.com/cockroachdb/pebble/sstable/valblk"
19-
"github.com/cockroachdb/redact"
2019
)
2120

2221
var (
@@ -79,25 +78,6 @@ type FileWriterStats struct {
7978
FileLen uint64
8079
}
8180

82-
// Handle describes the location of a value stored within a blob file.
83-
type Handle struct {
84-
FileNum base.DiskFileNum
85-
BlockNum uint32
86-
OffsetInBlock uint32
87-
ValueLen uint32
88-
}
89-
90-
// String implements the fmt.Stringer interface.
91-
func (h Handle) String() string {
92-
return redact.StringWithoutMarkers(h)
93-
}
94-
95-
// SafeFormat implements redact.SafeFormatter.
96-
func (h Handle) SafeFormat(w redact.SafePrinter, _ rune) {
97-
w.Printf("(%s,blk%d[%d:%d])",
98-
h.FileNum, h.BlockNum, h.OffsetInBlock, h.OffsetInBlock+h.ValueLen)
99-
}
100-
10181
// A FileWriter writes a blob file.
10282
type FileWriter struct {
10383
fileNum base.DiskFileNum

sstable/blob/blob_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/datadriven"
1717
"github.com/cockroachdb/pebble/objstorage"
1818
"github.com/cockroachdb/pebble/sstable/block"
19+
"github.com/stretchr/testify/require"
1920
)
2021

2122
func TestBlobWriter(t *testing.T) {
@@ -81,3 +82,37 @@ func printFileWriterStats(w io.Writer, stats FileWriterStats) {
8182
fmt.Fprintf(w, " UncompressedValueBytes: %d\n", stats.UncompressedValueBytes)
8283
fmt.Fprintf(w, " FileLen: %d\n", stats.FileLen)
8384
}
85+
86+
func TestHandleRoundtrip(t *testing.T) {
87+
handles := []InlineHandle{
88+
{
89+
InlineHandlePreface: InlineHandlePreface{
90+
ReferenceIndex: 0,
91+
ValueLen: 29357353,
92+
},
93+
HandleSuffix: HandleSuffix{
94+
BlockNum: 194,
95+
OffsetInBlock: 32911,
96+
},
97+
},
98+
{
99+
InlineHandlePreface: InlineHandlePreface{
100+
ReferenceIndex: 129,
101+
ValueLen: 205,
102+
},
103+
HandleSuffix: HandleSuffix{
104+
BlockNum: 2,
105+
OffsetInBlock: 20,
106+
},
107+
},
108+
}
109+
110+
for _, h := range handles {
111+
var buf [MaxInlineHandleLength]byte
112+
n := h.Encode(buf[:])
113+
preface, rem := DecodeInlineHandlePrefix(buf[:n])
114+
suffix := DecodeHandleSuffix(rem)
115+
require.Equal(t, h.InlineHandlePreface, preface)
116+
require.Equal(t, h.HandleSuffix, suffix)
117+
}
118+
}

sstable/blob/fetcher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,12 @@ func (r *ValueFetcher) Init(rp ReaderProvider, env block.ReadEnv) {
7878
func (r *ValueFetcher) Fetch(
7979
ctx context.Context, handle []byte, fileNum base.DiskFileNum, valLen uint32, buf []byte,
8080
) (val []byte, callerOwned bool, err error) {
81-
vblkHandle := valblk.DecodeRemainingHandle(handle)
81+
handleSuffix := DecodeHandleSuffix(handle)
8282
vh := Handle{
8383
FileNum: fileNum,
84-
BlockNum: vblkHandle.BlockNum,
85-
OffsetInBlock: vblkHandle.OffsetInBlock,
8684
ValueLen: valLen,
85+
BlockNum: handleSuffix.BlockNum,
86+
OffsetInBlock: handleSuffix.OffsetInBlock,
8787
}
8888
v, err := r.retrieve(ctx, vh)
8989
return v, false, err

sstable/blob/handle.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2+
// of this source code is governed by a BSD-style license that can be found in
3+
// the LICENSE file.
4+
5+
package blob
6+
7+
import (
8+
"encoding/binary"
9+
"unsafe"
10+
11+
"github.com/cockroachdb/pebble/internal/base"
12+
"github.com/cockroachdb/pebble/sstable/valblk"
13+
"github.com/cockroachdb/redact"
14+
)
15+
16+
// MaxInlineHandleLength is the maximum length of an inline blob handle.
17+
//
18+
// Handle fields are varint encoded, so maximum 5 bytes each.
19+
const MaxInlineHandleLength = 4 * binary.MaxVarintLen32
20+
21+
// Handle describes the location of a value stored within a blob file.
22+
type Handle struct {
23+
FileNum base.DiskFileNum
24+
BlockNum uint32
25+
OffsetInBlock uint32
26+
ValueLen uint32
27+
}
28+
29+
// String implements the fmt.Stringer interface.
30+
func (h Handle) String() string {
31+
return redact.StringWithoutMarkers(h)
32+
}
33+
34+
// SafeFormat implements redact.SafeFormatter.
35+
func (h Handle) SafeFormat(w redact.SafePrinter, _ rune) {
36+
w.Printf("(%s,blk%d[%d:%d])",
37+
h.FileNum, h.BlockNum, h.OffsetInBlock, h.OffsetInBlock+h.ValueLen)
38+
}
39+
40+
// TODO(jackson): Consider encoding the handle's data using columnar block
41+
// primitives, rather than a variable-width encoding in the value column.
42+
43+
// InlineHandle describes a handle as it is encoded within a sstable block. The
44+
// inline handle does not encode the blob file number outright. Instead it
45+
// encodes an index into the containing sstable's BlobReferences.
46+
//
47+
// The inline handle is composed of two parts: a preface (InlineHandlePreface)
48+
// and a suffix (HandleSuffix). The preface is eagerly decoded from the encoded
49+
// handle when returning an InternalValue to higher layers. The remaining bits
50+
// (the suffix) are decoded only when the value is being fetched from the blob
51+
// file.
52+
type InlineHandle struct {
53+
InlineHandlePreface
54+
HandleSuffix
55+
}
56+
57+
// InlineHandlePreface is the prefix of an inline handle. It's eagerly decoded
58+
// when returning an InternalValue to higher layers.
59+
type InlineHandlePreface struct {
60+
ReferenceIndex uint32
61+
ValueLen uint32
62+
}
63+
64+
// HandleSuffix is the suffix of an inline handle. It's decoded only when the
65+
// value is being fetched from the blob file.
66+
type HandleSuffix struct {
67+
BlockNum uint32
68+
OffsetInBlock uint32
69+
}
70+
71+
// String implements the fmt.Stringer interface.
72+
func (h InlineHandle) String() string {
73+
return redact.StringWithoutMarkers(h)
74+
}
75+
76+
// SafeFormat implements redact.SafeFormatter.
77+
func (h InlineHandle) SafeFormat(w redact.SafePrinter, _ rune) {
78+
w.Printf("(f%d,blk%d[%d:%d])",
79+
h.ReferenceIndex, h.BlockNum, h.OffsetInBlock, h.OffsetInBlock+h.ValueLen)
80+
}
81+
82+
// Encode encodes the inline handle into the provided buffer, returning the
83+
// number of bytes encoded.
84+
func (h InlineHandle) Encode(b []byte) int {
85+
n := 0
86+
n += binary.PutUvarint(b[n:], uint64(h.ReferenceIndex))
87+
n += valblk.EncodeHandle(b[n:], valblk.Handle{
88+
BlockNum: h.BlockNum,
89+
OffsetInBlock: h.OffsetInBlock,
90+
ValueLen: h.ValueLen,
91+
})
92+
return n
93+
}
94+
95+
// DecodeInlineHandlePrefix decodes the blob reference index and value length
96+
// from the beginning of a variable-width encoded InlineHandle.
97+
func DecodeInlineHandlePrefix(src []byte) (InlineHandlePreface, []byte) {
98+
ptr := unsafe.Pointer(&src[0])
99+
var refIdx uint32
100+
if a := *((*uint8)(ptr)); a < 128 {
101+
refIdx = uint32(a)
102+
src = src[1:]
103+
} else if a, b := a&0x7f, *((*uint8)(unsafe.Add(ptr, 1))); b < 128 {
104+
refIdx = uint32(b)<<7 | uint32(a)
105+
src = src[2:]
106+
} else if b, c := b&0x7f, *((*uint8)(unsafe.Add(ptr, 2))); c < 128 {
107+
refIdx = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
108+
src = src[3:]
109+
} else if c, d := c&0x7f, *((*uint8)(unsafe.Add(ptr, 3))); d < 128 {
110+
refIdx = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
111+
src = src[4:]
112+
} else {
113+
d, e := d&0x7f, *((*uint8)(unsafe.Add(ptr, 4)))
114+
refIdx = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
115+
src = src[5:]
116+
}
117+
118+
ptr = unsafe.Pointer(&src[0])
119+
var valueLen uint32
120+
if a := *((*uint8)(ptr)); a < 128 {
121+
valueLen = uint32(a)
122+
src = src[1:]
123+
} else if a, b := a&0x7f, *((*uint8)(unsafe.Add(ptr, 1))); b < 128 {
124+
valueLen = uint32(b)<<7 | uint32(a)
125+
src = src[2:]
126+
} else if b, c := b&0x7f, *((*uint8)(unsafe.Add(ptr, 2))); c < 128 {
127+
valueLen = uint32(c)<<14 | uint32(b)<<7 | uint32(a)
128+
src = src[3:]
129+
} else if c, d := c&0x7f, *((*uint8)(unsafe.Add(ptr, 3))); d < 128 {
130+
valueLen = uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
131+
src = src[4:]
132+
} else {
133+
d, e := d&0x7f, *((*uint8)(unsafe.Add(ptr, 4)))
134+
valueLen = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
135+
src = src[5:]
136+
}
137+
138+
return InlineHandlePreface{
139+
ReferenceIndex: refIdx,
140+
ValueLen: valueLen,
141+
}, src
142+
}
143+
144+
// DecodeHandleSuffix decodes the block number and offset in block from the
145+
// encoded handle.
146+
func DecodeHandleSuffix(src []byte) HandleSuffix {
147+
h := valblk.DecodeRemainingHandle(src)
148+
return HandleSuffix{
149+
BlockNum: h.BlockNum,
150+
OffsetInBlock: h.OffsetInBlock,
151+
}
152+
}

sstable/block/kv.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ func InPlaceValuePrefix(setHasSameKeyPrefix bool) ValuePrefix {
8181
}
8282

8383
// BlobValueHandlePrefix returns the ValuePrefix for a blob.
84-
func BlobValueHandlePrefix(setHasSameKeyPrefix bool) ValuePrefix {
85-
prefix := valueKindIsBlobHandle
84+
func BlobValueHandlePrefix(setHasSameKeyPrefix bool, attr base.ShortAttribute) ValuePrefix {
85+
prefix := valueKindIsBlobHandle | ValuePrefix(attr)
8686
if setHasSameKeyPrefix {
8787
prefix = prefix | setHasSameKeyPrefixMask
8888
}

sstable/block/kv_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func TestValuePrefix(t *testing.T) {
5050
isValueBlockHandle: false,
5151
isBlobHandle: true,
5252
setHasSamePrefix: true,
53+
attr: 2,
5354
},
5455
}
5556
for _, tc := range testCases {
@@ -58,7 +59,7 @@ func TestValuePrefix(t *testing.T) {
5859
if tc.isValueBlockHandle {
5960
prefix = ValueBlockHandlePrefix(tc.setHasSamePrefix, tc.attr)
6061
} else if tc.isBlobHandle {
61-
prefix = BlobValueHandlePrefix(tc.setHasSamePrefix)
62+
prefix = BlobValueHandlePrefix(tc.setHasSamePrefix, tc.attr)
6263
} else {
6364
prefix = InPlaceValuePrefix(tc.setHasSamePrefix)
6465
}

sstable/colblk_writer.go

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cockroachdb/pebble/internal/invariants"
2020
"github.com/cockroachdb/pebble/internal/keyspan"
2121
"github.com/cockroachdb/pebble/objstorage"
22+
"github.com/cockroachdb/pebble/sstable/blob"
2223
"github.com/cockroachdb/pebble/sstable/block"
2324
"github.com/cockroachdb/pebble/sstable/colblk"
2425
"github.com/cockroachdb/pebble/sstable/rowblk"
@@ -377,7 +378,52 @@ func (w *RawColumnWriter) Add(key InternalKey, value []byte, forceObsolete bool)
377378
valuePrefix = block.InPlaceValuePrefix(eval.kcmp.PrefixEqual())
378379
}
379380
}
381+
return w.add(key, len(value), valueStoredWithKey, valuePrefix, eval)
382+
}
383+
384+
// AddWithBlobHandle implements the RawWriter interface.
385+
func (w *RawColumnWriter) AddWithBlobHandle(
386+
key InternalKey, h blob.InlineHandle, attr base.ShortAttribute, forceObsolete bool,
387+
) error {
388+
// Blob value handles require at least TableFormatPebblev6.
389+
if w.opts.TableFormat <= TableFormatPebblev5 {
390+
w.err = errors.Newf("pebble: blob value handles are not supported in %s", w.opts.TableFormat.String())
391+
return w.err
392+
}
393+
switch key.Kind() {
394+
case base.InternalKeyKindRangeDelete, base.InternalKeyKindRangeKeySet,
395+
base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
396+
return errors.Newf("%s must be added through EncodeSpan", key.Kind())
397+
case base.InternalKeyKindMerge:
398+
return errors.Errorf("MERGE does not support blob value handles")
399+
}
400+
401+
eval, err := w.evaluatePoint(key, int(h.ValueLen))
402+
if err != nil {
403+
return err
404+
}
405+
eval.isObsolete = eval.isObsolete || forceObsolete
406+
w.prevPointKey.trailer = key.Trailer
407+
w.prevPointKey.isObsolete = eval.isObsolete
380408

409+
n := h.Encode(w.tmp[:])
410+
valueStoredWithKey := w.tmp[:n]
411+
valuePrefix := block.BlobValueHandlePrefix(eval.kcmp.PrefixEqual(), attr)
412+
err = w.add(key, int(h.ValueLen), valueStoredWithKey, valuePrefix, eval)
413+
if err != nil {
414+
return err
415+
}
416+
w.props.NumValuesInBlobFiles++
417+
return nil
418+
}
419+
420+
func (w *RawColumnWriter) add(
421+
key InternalKey,
422+
valueLen int,
423+
valueStoredWithKey []byte,
424+
valuePrefix block.ValuePrefix,
425+
eval pointKeyEvaluation,
426+
) error {
381427
// Append the key to the data block. We have NOT yet committed to
382428
// including the key in the block. The data block writer permits us to
383429
// finish the block excluding the last-appended KV.
@@ -406,12 +452,12 @@ func (w *RawColumnWriter) Add(key InternalKey, value []byte, forceObsolete bool)
406452
}
407453

408454
for i := range w.blockPropCollectors {
409-
v := value
410-
if key.Kind() == base.InternalKeyKindSet {
411-
// Values for SET are not required to be in-place, and in the future
412-
// may not even be read by the compaction, so pass nil values. Block
413-
// property collectors in such Pebble DB's must not look at the
414-
// value.
455+
v := valueStoredWithKey
456+
if key.Kind() == base.InternalKeyKindSet || key.Kind() == base.InternalKeyKindSetWithDelete || !valuePrefix.IsInPlaceValue() {
457+
// Values for SET, SETWITHDEL keys are not required to be in-place,
458+
// and may not even be read by the compaction, so pass nil values.
459+
// Block property collectors in such Pebble DB's must not look at
460+
// the value.
415461
v = nil
416462
}
417463
if err := w.blockPropCollectors[i].AddPointKey(key, v); err != nil {
@@ -437,12 +483,12 @@ func (w *RawColumnWriter) Add(key InternalKey, value []byte, forceObsolete bool)
437483
w.dataBlock.deletionSize += len(key.UserKey)
438484
case InternalKeyKindDeleteSized:
439485
var size uint64
440-
if len(value) > 0 {
486+
if len(valueStoredWithKey) > 0 {
441487
var n int
442-
size, n = binary.Uvarint(value)
488+
size, n = binary.Uvarint(valueStoredWithKey)
443489
if n <= 0 {
444490
return errors.Newf("%s key's value (%x) does not parse as uvarint",
445-
errors.Safe(key.Kind().String()), value)
491+
errors.Safe(key.Kind().String()), valueStoredWithKey)
446492
}
447493
}
448494
w.props.NumDeletions++
@@ -455,7 +501,7 @@ func (w *RawColumnWriter) Add(key InternalKey, value []byte, forceObsolete bool)
455501
w.props.NumMergeOperands++
456502
}
457503
w.props.RawKeySize += uint64(key.Size())
458-
w.props.RawValueSize += uint64(len(value))
504+
w.props.RawValueSize += uint64(valueLen)
459505
return nil
460506
}
461507

0 commit comments

Comments
 (0)