Skip to content

Commit 495be96

Browse files
committed
valsep: create SSTBlobWriter to write external sst with blob files
SSTBlobWriter is an SST writer that can be configured to separate values into new blob files. Once closed, the writer can return metadata about the written sst and its blob files.
1 parent c9a3b2a commit 495be96

File tree

6 files changed

+444
-0
lines changed

6 files changed

+444
-0
lines changed

sstable/colblk_writer.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,14 @@ func (w *RawColumnWriter) IsPrefixEqualPrev(k []byte) bool {
268268
return w.dataBlock.KeyWriter.ComparePrev(k).PrefixEqual()
269269
}
270270

271+
// PrevPointKeyKind implements the RawWriter interface.
272+
func (w *RawColumnWriter) PrevPointKeyKind() base.InternalKeyKind {
273+
if w == nil || w.dataBlock.Rows() == 0 {
274+
return base.InternalKeyKindInvalid
275+
}
276+
return w.prevPointKey.trailer.Kind()
277+
}
278+
271279
// SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
272280
// be used internally by Pebble.
273281
func (w *RawColumnWriter) SetSnapshotPinnedProperties(

sstable/rowblk_writer.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1408,6 +1408,14 @@ func (w *RawRowWriter) IsPrefixEqualPrev(k []byte) bool {
14081408
return bytes.Equal(w.split.Prefix(k), w.split.Prefix(w.dataBlockBuf.dataBlock.CurUserKey()))
14091409
}
14101410

1411+
// PrevPointKeyKind implements the RawWriter interface.
1412+
func (w *RawRowWriter) PrevPointKeyKind() base.InternalKeyKind {
1413+
if w == nil || w.dataBlockBuf.dataBlock.EntryCount() == 0 {
1414+
return base.InternalKeyKindInvalid
1415+
}
1416+
return w.dataBlockBuf.dataBlock.CurKey().Kind()
1417+
}
1418+
14111419
// EncodeSpan encodes the keys in the given span. The span can contain either
14121420
// only RANGEDEL keys or only range keys.
14131421
//

sstable/writer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,9 @@ type RawWriter interface {
363363
//
364364
// Must not be called after Writer is closed.
365365
IsPrefixEqualPrev(k []byte) bool
366+
// PrevPointKeyKind returns the InternalKeyKind of the last point key written
367+
// to the writer. Must not be called after Writer is closed.
368+
PrevPointKeyKind() base.InternalKeyKind
366369

367370
// SetValueSeparationProps sets the value separation props that were used when
368371
// writing this sstable. This is recorded in the sstable properties.

valsep/sst_blob_writer.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2+
// of this source code is governed by a BSD-style license that can be found in
3+
// the LICENSE file.
4+
5+
package valsep
6+
7+
import (
8+
"github.com/cockroachdb/errors"
9+
"github.com/cockroachdb/pebble/internal/base"
10+
"github.com/cockroachdb/pebble/objstorage"
11+
"github.com/cockroachdb/pebble/sstable"
12+
"github.com/cockroachdb/pebble/sstable/blob"
13+
"github.com/cockroachdb/pebble/sstable/block"
14+
)
15+
16+
// SSTBlobWriter writes an sstable and 0 or more blob value files
17+
// for ingesting into pebble. Values are extracted depending on the
18+
// value separation strategy defined. If ValueSeparation is
19+
// neverSeparateValues, this behaves like sstable.Writer and no blob
20+
// files are written.
21+
type SSTBlobWriter struct {
22+
// Promote methods from sstable.Writer.
23+
*sstable.Writer
24+
valSep ValueSeparation
25+
err error
26+
27+
blobFileNum base.DiskFileNum
28+
closed bool
29+
// isStrictObsolete is true if the writer is configured to write and enforce
30+
// a 'strict obsolete' sstable. This includes prohibiting the addition of
31+
// MERGE keys. See the documentation in format.go for more details.
32+
isStrictObsolete bool
33+
34+
kvScratch base.InternalKV
35+
// Metadata on the blob files written.
36+
blobFilesMeta []blob.FileWriterStats
37+
}
38+
39+
type NewBlobFile func() (objstorage.Writable, error)
40+
41+
var neverSeparateValues = &NeverSeparateValues{}
42+
43+
type SSTBlobWriterOptions struct {
44+
SSTWriterOpts sstable.WriterOptions
45+
BlobWriterOpts blob.FileWriterOptions
46+
// BlobFilesDisabled is true if value separation into blob files
47+
// is disabled and the writer will behave like an sstable.Writer
48+
// in this case. Note that values may still be separated into a
49+
// value block in the same sstable.
50+
BlobFilesDisabled bool
51+
// The minimum size required for a value to be separated into a
52+
// blob file. This value may be overridden by the span policy.
53+
ValueSeparationMinSize int
54+
// The minimum size required for a value to be separated into a
55+
// blob file if it is classified to be MVCC garbage. A value
56+
// of 0 means all MVCC garbage values are eligible for separation.
57+
// To disable MVCC garbage value separation, set
58+
// DisableSeparationBySuffix in the SpanPolicy's ValueStoragePolicy.
59+
MVCCGarbageValueSeparationMinSize int
60+
// SpanPolicy specifies the specific policies applied to the table span.
61+
// When using the external writer, there should be 1 span policy
62+
// applied to the entire sstable.
63+
SpanPolicy base.SpanPolicy
64+
NewBlobFileFn NewBlobFile
65+
}
66+
67+
// NewSSTBlobWriter returns a new SSTBlobWriter that writes to the provided
68+
// sstHandle. The writer uses the provided options to configure both the sstable
69+
// writer and the blob file writer.
70+
func NewSSTBlobWriter(sstHandle objstorage.Writable, opts SSTBlobWriterOptions) *SSTBlobWriter {
71+
writer := &SSTBlobWriter{
72+
isStrictObsolete: opts.SSTWriterOpts.IsStrictObsolete,
73+
}
74+
75+
if opts.SpanPolicy.PreferFastCompression && opts.SSTWriterOpts.Compression != block.NoCompression {
76+
opts.SSTWriterOpts.Compression = block.FastestCompression
77+
}
78+
79+
writer.Writer = sstable.NewWriter(sstHandle, opts.SSTWriterOpts)
80+
81+
// Create the value separator.
82+
minimumValueSize := opts.ValueSeparationMinSize
83+
if opts.SpanPolicy.ValueStoragePolicy.OverrideBlobSeparationMinimumSize > 0 {
84+
minimumValueSize = opts.SpanPolicy.ValueStoragePolicy.OverrideBlobSeparationMinimumSize
85+
}
86+
if opts.BlobFilesDisabled || minimumValueSize == 0 || opts.SpanPolicy.ValueStoragePolicy.DisableBlobSeparation {
87+
writer.valSep = neverSeparateValues
88+
} else {
89+
newBlobObject := func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
90+
// The ObjectMetadata collected by the value separator will not be
91+
// exposed by this writer, since this store does not yet know about
92+
// these objects. However, we must provide a unique file number for
93+
// each new blob file because the value separator uses file ids to
94+
// retrieve the index of the blob file within the sst's tracked blob
95+
// references array. The reference id (array index) is what is then
96+
// written to the inline blob handle.
97+
newHandle, err := opts.NewBlobFileFn()
98+
if err != nil {
99+
return nil, objstorage.ObjectMetadata{}, err
100+
}
101+
nextFileNum := writer.blobFileNum
102+
writer.blobFileNum++
103+
return newHandle, objstorage.ObjectMetadata{DiskFileNum: nextFileNum}, nil
104+
}
105+
writer.valSep = NewWriteNewBlobFiles(
106+
opts.SSTWriterOpts.Comparer,
107+
newBlobObject,
108+
opts.BlobWriterOpts,
109+
minimumValueSize,
110+
opts.MVCCGarbageValueSeparationMinSize,
111+
WriteNewBlobFilesOptions{
112+
DisableValueSeparationBySuffix: opts.SpanPolicy.ValueStoragePolicy.DisableSeparationBySuffix,
113+
ShortAttrExtractor: opts.SSTWriterOpts.ShortAttributeExtractor,
114+
InvalidValueCallback: func(userKey []byte, value []byte, err error) {
115+
writer.err = errors.CombineErrors(writer.err, err)
116+
},
117+
},
118+
)
119+
}
120+
121+
return writer
122+
}
123+
124+
// Error returns the current accumulated error if any.
125+
func (w *SSTBlobWriter) Error() error {
126+
return errors.CombineErrors(w.err, w.Writer.Error())
127+
}
128+
129+
// Set sets the value for the given key. The sequence number is set to 0.
130+
// Values may be separated into blob files depending on the value separation
131+
// strategy configured for the writer. Intended for use to externally construct
132+
// an sstable with its blob files before ingestion into a DB. For a given
133+
// SSTBlobWriter, the keys passed to Set must be in strictly increasing order.
134+
func (w *SSTBlobWriter) Set(key, value []byte) error {
135+
if err := w.Error(); err != nil {
136+
return err
137+
}
138+
139+
if w.isStrictObsolete {
140+
return errors.Errorf("use raw writer Add in strict obsolete mode")
141+
}
142+
143+
w.kvScratch.K = base.MakeInternalKey(key, 0, sstable.InternalKeyKindSet)
144+
w.kvScratch.V = base.MakeInPlaceValue(value)
145+
isLikelyMVCCGarbage := sstable.IsLikelyMVCCGarbage(
146+
key, w.Raw().PrevPointKeyKind(), sstable.InternalKeyKindSet, len(value), w.Writer.Raw().IsPrefixEqualPrev)
147+
return w.valSep.Add(w.Raw(), &w.kvScratch, false, isLikelyMVCCGarbage)
148+
}
149+
150+
// BlobWriterMetas returns a slice of blob.FileWriterStats describing the
151+
// blob files written by this SSTBlobWriter. The ordering of the returned
152+
// slice matches the ordering of blob files as they should appear in the
153+
// sstable's manifest.BlobReferences. Close must be called before calling this
154+
// method.
155+
func (w *SSTBlobWriter) BlobWriterMetas() ([]blob.FileWriterStats, error) {
156+
if !w.closed {
157+
return nil, errors.New("blob writer not closed")
158+
}
159+
return w.blobFilesMeta, nil
160+
}
161+
162+
// Close closes both the sstable writer and the blob file writer if any.
163+
func (w *SSTBlobWriter) Close() error {
164+
w.err = errors.CombineErrors(w.err, w.Writer.Close())
165+
meta, err := w.valSep.FinishOutput()
166+
if err != nil {
167+
w.err = errors.CombineErrors(w.err, err)
168+
}
169+
for _, blobFile := range meta.NewBlobFiles {
170+
w.blobFilesMeta = append(w.blobFilesMeta, blobFile.FileStats)
171+
}
172+
w.closed = true
173+
return w.err
174+
}

valsep/sst_blob_writer_test.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2+
// of this source code is governed by a BSD-style license that can be found in
3+
// the LICENSE file.
4+
5+
package valsep
6+
7+
import (
8+
"bytes"
9+
"context"
10+
"fmt"
11+
"strconv"
12+
"strings"
13+
"testing"
14+
15+
"github.com/cockroachdb/crlib/testutils/leaktest"
16+
"github.com/cockroachdb/datadriven"
17+
"github.com/cockroachdb/pebble/internal/base"
18+
"github.com/cockroachdb/pebble/internal/testkeys"
19+
"github.com/cockroachdb/pebble/objstorage"
20+
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
21+
"github.com/cockroachdb/pebble/sstable"
22+
"github.com/cockroachdb/pebble/vfs"
23+
"github.com/stretchr/testify/require"
24+
)
25+
26+
func TestSSTBlobWriter(t *testing.T) {
27+
defer leaktest.AfterTest(t)()
28+
runDataDriven(t, "testdata/sst_blob_writer")
29+
}
30+
31+
// The span policy string is in the form "(<option1>=<val>,<option2>=<val>...)"
32+
func parseSpanPolicy(t *testing.T, spanPolicyStr string) base.SpanPolicy {
33+
spanPolicyStr = strings.TrimPrefix(spanPolicyStr, "(")
34+
spanPolicyStr = strings.TrimSuffix(spanPolicyStr, ")")
35+
var policy base.ValueStoragePolicyAdjustment
36+
var err error
37+
for part := range strings.SplitSeq(spanPolicyStr, ",") {
38+
fieldParts := strings.Split(part, "=")
39+
switch fieldParts[0] {
40+
case "no-value-separation":
41+
policy.DisableBlobSeparation = true
42+
case "value-separation-min-size":
43+
policy.OverrideBlobSeparationMinimumSize, err = strconv.Atoi(fieldParts[1])
44+
if err != nil {
45+
t.Fatalf("parsing value-separation-min-size: %v", err)
46+
}
47+
case "disable-value-separation-by-suffix":
48+
policy.DisableSeparationBySuffix = true
49+
default:
50+
t.Fatalf("unrecognized span policy option: %s", fieldParts[0])
51+
}
52+
}
53+
54+
return base.SpanPolicy{
55+
ValueStoragePolicy: policy,
56+
}
57+
}
58+
59+
func parseBuildSSTBlobWriterOptions(t *testing.T, td *datadriven.TestData) SSTBlobWriterOptions {
60+
opts := SSTBlobWriterOptions{}
61+
td.MaybeScanArgs(t, "value-separation-min-size", &opts.ValueSeparationMinSize)
62+
63+
var spanPolicyStr string
64+
td.MaybeScanArgs(t, "span-policy", &spanPolicyStr)
65+
if spanPolicyStr != "" {
66+
opts.SpanPolicy = parseSpanPolicy(t, spanPolicyStr)
67+
}
68+
return opts
69+
}
70+
71+
func runDataDriven(t *testing.T, file string) {
72+
datadriven.RunTest(t, file, func(t *testing.T, td *datadriven.TestData) string {
73+
ctx := context.Background()
74+
switch td.Cmd {
75+
case "build":
76+
var buf bytes.Buffer
77+
fs := vfs.WithLogging(vfs.NewMem(), func(format string, args ...any) {
78+
fmt.Fprint(&buf, "# ")
79+
fmt.Fprintf(&buf, format, args...)
80+
fmt.Fprintln(&buf)
81+
})
82+
objSettings := objstorageprovider.DefaultSettings(fs, "")
83+
objStore, err := objstorageprovider.Open(objSettings)
84+
require.NoError(t, err)
85+
blobFileCount := 0
86+
opts := parseBuildSSTBlobWriterOptions(t, td)
87+
opts.SSTWriterOpts.Comparer = testkeys.Comparer
88+
opts.SSTWriterOpts.TableFormat = sstable.TableFormatPebblev7
89+
opts.NewBlobFileFn = func() (objstorage.Writable, error) {
90+
fnum := blobFileCount
91+
w, _, err := objStore.Create(ctx, base.FileTypeBlob, base.DiskFileNum(fnum), objstorage.CreateOptions{})
92+
if err != nil {
93+
return nil, err
94+
}
95+
blobFileCount++
96+
return w, err
97+
}
98+
sstHandle, _, err := objStore.Create(ctx, base.FileTypeTable, 0, objstorage.CreateOptions{})
99+
require.NoError(t, err)
100+
writer := NewSSTBlobWriter(sstHandle, opts)
101+
defer func() {
102+
if !writer.closed {
103+
_ = writer.Close()
104+
}
105+
}()
106+
kvs, err := sstable.ParseTestKVsAndSpans(td.Input, nil)
107+
if err != nil {
108+
return fmt.Sprintf("error parsing input: %v", err)
109+
}
110+
111+
for _, kv := range kvs {
112+
keyKind := kv.Key.Kind()
113+
if kv.IsKeySpan() {
114+
keyKind = kv.Span.Keys[0].Kind()
115+
}
116+
switch keyKind {
117+
case sstable.InternalKeyKindSet, sstable.InternalKeyKindSetWithDelete:
118+
if err := writer.Set(kv.Key.UserKey, kv.Value); err != nil {
119+
return fmt.Sprintf("error putting key %s: %v", kv.Key.UserKey, err)
120+
}
121+
case sstable.InternalKeyKindDelete, sstable.InternalKeyKindDeleteSized:
122+
if err := writer.Delete(kv.Key.UserKey); err != nil {
123+
return fmt.Sprintf("error deleting key %s: %v", kv.Key.UserKey, err)
124+
}
125+
case base.InternalKeyKindRangeDelete:
126+
if err := writer.DeleteRange(kv.Span.Start, kv.Span.End); err != nil {
127+
return fmt.Sprintf("error deleting range %s-%s: %v", kv.Span.Start, kv.Span.End, err)
128+
}
129+
case sstable.InternalKeyKindMerge:
130+
if err := writer.Merge(kv.Key.UserKey, kv.Value); err != nil {
131+
return fmt.Sprintf("error merging key %s: %v", kv.Key.UserKey, err)
132+
}
133+
case base.InternalKeyKindRangeKeySet:
134+
if err := writer.RangeKeySet(kv.Span.Start, kv.Span.End, nil, kv.Value); err != nil {
135+
return fmt.Sprintf("error setting range key %s-%s: %v", kv.Span.Start, kv.Span.End, err)
136+
}
137+
case base.InternalKeyKindRangeKeyUnset:
138+
if err := writer.RangeKeyUnset(kv.Span.Start, kv.Span.End, kv.Key.UserKey); err != nil {
139+
return fmt.Sprintf("error unsetting range key %s-%s: %v", kv.Span.Start, kv.Span.End, err)
140+
}
141+
case base.InternalKeyKindRangeKeyDelete:
142+
if err := writer.RangeKeyDelete(kv.Span.Start, kv.Span.End); err != nil {
143+
return fmt.Sprintf("error deleting range key %s-%s: %v", kv.Span.Start, kv.Span.End, err)
144+
}
145+
default:
146+
return fmt.Sprintf("unsupported key kind %v", kv.Key.Kind())
147+
}
148+
}
149+
150+
if err := writer.Close(); err != nil {
151+
return fmt.Sprintf("error closing writer: %v", err)
152+
}
153+
154+
tableMeta, err := writer.Metadata()
155+
if err != nil {
156+
return fmt.Sprintf("error getting metadata: %v", err)
157+
}
158+
159+
blobMetas, err := writer.BlobWriterMetas()
160+
if err != nil {
161+
return fmt.Sprintf("error getting blob metas: %v", err)
162+
}
163+
164+
var outputBuf bytes.Buffer
165+
// Print some sst properties.
166+
fmt.Fprintf(&outputBuf, "size:%d\n", tableMeta.Size)
167+
outputBuf.WriteString("blobfiles:")
168+
require.Equal(t, blobFileCount, len(blobMetas))
169+
if len(blobMetas) > 0 {
170+
outputBuf.WriteString("\n")
171+
for i, bm := range blobMetas {
172+
fmt.Fprintf(&outputBuf, "%d: %s\n", i+1, bm.String())
173+
}
174+
} else {
175+
outputBuf.WriteString(" none\n")
176+
}
177+
return outputBuf.String()
178+
default:
179+
return fmt.Sprintf("unrecognized command %s", td.Cmd)
180+
}
181+
})
182+
}

0 commit comments

Comments
 (0)