Skip to content

Commit 65f3f9b

Browse files
committed
blob: uncouple blob file rewriting logic
Uncouple the logic around actually writing out a rewritten blob file from the logic around identifying which values remain live. This will help us test both parts in isolation. Additionally, this commit fixes a bug in which values with the same BlockID could be split across different physical blocks if the blob.FileWriter's FlushGovernor decided they should during a call to AddValue.
1 parent 3a1173a commit 65f3f9b

File tree

6 files changed

+162
-103
lines changed

6 files changed

+162
-103
lines changed

blob_rewrite.go

Lines changed: 38 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -280,14 +280,20 @@ func (d *DB) runBlobFileRewriteLocked(
280280
Path: d.objProvider.Path(objMeta),
281281
FileNum: objMeta.DiskFileNum,
282282
})
283-
// Initialize a blob file writer. We pass L6 to MakeBlobWriterOptions.
283+
// Initialize a blob file rewriter. We pass L6 to MakeBlobWriterOptions.
284284
// There's no single associated level with a blob file. A long-lived blob
285285
// file that gets rewritten is likely to mostly be referenced from L6.
286286
// TODO(jackson): Consider refactoring to remove the level association.
287-
writer := blob.NewFileWriter(newDiskFileNum, writable, d.opts.MakeBlobWriterOptions(6))
288-
287+
rewriter := newBlobFileRewriter(
288+
d.fileCache,
289+
env,
290+
newDiskFileNum,
291+
writable,
292+
d.opts.MakeBlobWriterOptions(6),
293+
c.referencingTables,
294+
c.input,
295+
)
289296
// Perform the rewrite.
290-
rewriter := newBlobFileRewriter(d.fileCache, env, writer, c.referencingTables, c.input)
291297
stats, err := rewriter.Rewrite(ctx)
292298
if err != nil {
293299
return objstorage.ObjectMetadata{}, nil, err
@@ -361,51 +367,35 @@ type blockValues struct {
361367
liveValueIDs []int
362368
}
363369

364-
// blobFileMapping implements blob.FileMapping to always map to the input blob
365-
// file.
366-
type blobFileMapping struct {
367-
fileNum base.DiskFileNum
368-
}
369-
370-
// Assert that (*blobFileMapping) implements blob.FileMapping.
371-
var _ blob.FileMapping = (*blobFileMapping)(nil)
372-
373-
func (m *blobFileMapping) Lookup(fileID base.BlobFileID) (base.DiskFileNum, bool) {
374-
return m.fileNum, true
375-
}
376-
377370
// blobFileRewriter is responsible for rewriting blob files by combining and
378371
// processing blob reference liveness encodings from multiple SSTables. It
379372
// maintains state for writing to an output blob file.
380373
type blobFileRewriter struct {
381-
fc *fileCacheHandle
382-
env block.ReadEnv
383-
sstables []*manifest.TableMetadata
384-
385-
inputBlob manifest.BlobFileMetadata
386-
valueFetcher blob.ValueFetcher
387-
fileMapping blobFileMapping
388-
blkHeap blockHeap
389-
390-
// Current blob writer state.
391-
writer *blob.FileWriter
374+
fc *fileCacheHandle
375+
readEnv block.ReadEnv
376+
sstables []*manifest.TableMetadata
377+
inputBlob manifest.BlobFileMetadata
378+
rw *blob.FileRewriter
379+
blkHeap blockHeap
392380
}
393381

394382
func newBlobFileRewriter(
395383
fc *fileCacheHandle,
396-
env block.ReadEnv,
397-
writer *blob.FileWriter,
384+
readEnv block.ReadEnv,
385+
outputFileNum base.DiskFileNum,
386+
w objstorage.Writable,
387+
opts blob.FileWriterOptions,
398388
sstables []*manifest.TableMetadata,
399389
inputBlob manifest.BlobFileMetadata,
400390
) *blobFileRewriter {
391+
rw := blob.NewFileRewriter(inputBlob.FileID, inputBlob.Physical.FileNum, fc, readEnv, outputFileNum, w, opts)
401392
return &blobFileRewriter{
402-
fc: fc,
403-
env: env,
404-
writer: writer,
405-
sstables: sstables,
406-
inputBlob: inputBlob,
407-
fileMapping: blobFileMapping{fileNum: inputBlob.Physical.FileNum},
408-
blkHeap: blockHeap{},
393+
fc: fc,
394+
readEnv: readEnv,
395+
rw: rw,
396+
sstables: sstables,
397+
inputBlob: inputBlob,
398+
blkHeap: blockHeap{},
409399
}
410400
}
411401

@@ -426,7 +416,7 @@ func (rw *blobFileRewriter) generateHeap() error {
426416
return errors.AssertionFailedf("table %s doesn't contain a reference to blob file %s",
427417
sst.TableNum, rw.inputBlob.FileID)
428418
}
429-
err := rw.fc.withReader(ctx, rw.env, sst, func(r *sstable.Reader, readEnv sstable.ReadEnv) error {
419+
err := rw.fc.withReader(ctx, rw.readEnv, sst, func(r *sstable.Reader, readEnv sstable.ReadEnv) error {
430420
h, err := r.ReadBlobRefIndexBlock(ctx, readEnv.Block)
431421
if err != nil {
432422
return err
@@ -449,42 +439,8 @@ func (rw *blobFileRewriter) generateHeap() error {
449439
return nil
450440
}
451441

452-
// copyBlockValues copies the live values from the given block to the output
453-
// blob file, flushing the current block before if necessary.
454-
func (rw *blobFileRewriter) copyBlockValues(ctx context.Context, finishedBlock blockValues) error {
455-
shouldFlush := rw.writer.ShouldFlushBefore(finishedBlock.valuesSize)
456-
if shouldFlush {
457-
rw.writer.ForceFlush()
458-
}
459-
460-
// Record the mapping from the virtual block ID to the current physical
461-
// block and value ID offset.
462-
rw.writer.BeginNewVirtualBlock(finishedBlock.blockID)
463-
slices.Sort(finishedBlock.liveValueIDs)
464-
465-
for i, valueID := range finishedBlock.liveValueIDs {
466-
if i > 0 && finishedBlock.liveValueIDs[i-1]+1 != valueID {
467-
// There's a gap in the referenced value IDs.
468-
for missing := finishedBlock.liveValueIDs[i-1] + 1; missing < valueID; missing++ {
469-
rw.writer.AddValue(nil)
470-
}
471-
}
472-
473-
value, _, err := rw.valueFetcher.Fetch(ctx, rw.inputBlob.FileID, finishedBlock.blockID, blob.BlockValueID(valueID))
474-
if err != nil {
475-
return err
476-
}
477-
rw.writer.AddValue(value)
478-
}
479-
return nil
480-
}
481-
482442
func (rw *blobFileRewriter) Rewrite(ctx context.Context) (blob.FileWriterStats, error) {
483-
rw.valueFetcher.Init(&rw.fileMapping, rw.fc, rw.env)
484-
defer func() { _ = rw.valueFetcher.Close() }()
485-
486-
err := rw.generateHeap()
487-
if err != nil {
443+
if err := rw.generateHeap(); err != nil {
488444
return blob.FileWriterStats{}, err
489445
}
490446
if rw.blkHeap.Len() == 0 {
@@ -494,7 +450,7 @@ func (rw *blobFileRewriter) Rewrite(ctx context.Context) (blob.FileWriterStats,
494450
// Begin constructing our output blob file. We maintain a map of blockID
495451
// to accumulated liveness data across all referencing sstables.
496452
firstBlock := heap.Pop(&rw.blkHeap).(*sstable.BlobRefLivenessEncoding)
497-
pendingBlock := blockValues{
453+
pending := blockValues{
498454
blockID: firstBlock.BlockID,
499455
valuesSize: firstBlock.ValuesSize,
500456
liveValueIDs: slices.Collect(sstable.IterSetBitsInRunLengthBitmap(firstBlock.Bitmap)),
@@ -504,21 +460,23 @@ func (rw *blobFileRewriter) Rewrite(ctx context.Context) (blob.FileWriterStats,
504460

505461
// If we are encountering a new block, write the last accumulated block
506462
// to the blob file.
507-
if pendingBlock.blockID != nextBlock.BlockID {
463+
if pending.blockID != nextBlock.BlockID {
508464
// Write the last accumulated block's values to the blob file.
509-
if err := rw.copyBlockValues(ctx, pendingBlock); err != nil {
465+
err := rw.rw.CopyBlock(ctx, pending.blockID, pending.valuesSize, pending.liveValueIDs)
466+
if err != nil {
510467
return blob.FileWriterStats{}, err
511468
}
512469
}
513470
// Update the accumulated encoding for this block.
514-
pendingBlock.valuesSize += nextBlock.ValuesSize
515-
pendingBlock.liveValueIDs = slices.AppendSeq(pendingBlock.liveValueIDs,
471+
pending.valuesSize += nextBlock.ValuesSize
472+
pending.liveValueIDs = slices.AppendSeq(pending.liveValueIDs,
516473
sstable.IterSetBitsInRunLengthBitmap(nextBlock.Bitmap))
517474
}
518475

519476
// Copy the last accumulated block.
520-
if err := rw.copyBlockValues(ctx, pendingBlock); err != nil {
477+
err := rw.rw.CopyBlock(ctx, pending.blockID, pending.valuesSize, pending.liveValueIDs)
478+
if err != nil {
521479
return blob.FileWriterStats{}, err
522480
}
523-
return rw.writer.Close()
481+
return rw.rw.Close()
524482
}

blob_rewrite_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@ func TestBlobRewrite(t *testing.T) {
192192
outputWritable, _, err := objStore.Create(ctx, base.FileTypeBlob, fn, objstorage.CreateOptions{})
193193
require.NoError(t, err)
194194

195-
blobWriter := blob.NewFileWriter(fn, outputWritable, blob.FileWriterOptions{})
196-
rewriter := newBlobFileRewriter(mockFC, block.ReadEnv{}, blobWriter, sstables, inputBlob)
195+
rewriter := newBlobFileRewriter(mockFC, block.ReadEnv{}, fn, outputWritable,
196+
blob.FileWriterOptions{}, sstables, inputBlob)
197197
stats, err := rewriter.Rewrite(context.Background())
198198
if err != nil {
199199
fmt.Fprintf(&buf, "rewrite error: %v\n", err)

internal/blobtest/handles.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (bv *Values) WriteFiles(
243243
prevID := -1
244244
for i, handle := range handles {
245245
if i > 0 && handles[i-1].BlockID != handle.BlockID {
246-
writer.ForceFlush()
246+
writer.FlushForTesting()
247247
prevID = -1
248248
}
249249
// The user of a blobtest.Values may specify a value ID for a handle. If

sstable/blob/blob.go

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -162,19 +162,21 @@ func (w *FileWriter) AddValue(v []byte) Handle {
162162
}
163163
}
164164

165-
// BeginNewVirtualBlock adds a virtual block mapping to the current physical
165+
// beginNewVirtualBlock adds a virtual block mapping to the current physical
166166
// block and valueID offset within the block.
167167
//
168-
// When a blob file is rewritten, BeginNewVirtualBlock is called for each block
168+
// When a blob file is rewritten, beginNewVirtualBlock is called for each block
169169
// in the original blob file before adding any of the block's extant values.
170-
// BeginNewVirtualBlock records a mapping from the original block ID (referred
170+
// beginNewVirtualBlock records a mapping from the original block ID (referred
171171
// to as a virtual block) to a tuple of the physical block index and the offset
172172
// of the BlockValueIDs within the new physical block.
173173
//
174174
// This mapping is used by readers to determine which physical block contains a
175175
// given virtual block, and how to map BlockValueIDs from the given virtual
176176
// block to BlockValueIDs in the physical block.
177-
func (w *FileWriter) BeginNewVirtualBlock(vblockID BlockID) {
177+
func (w *FileWriter) beginNewVirtualBlock(vblockID BlockID) {
178+
// TODO(jackson): Update tests to use the blob.FileRewriter type and move this
179+
// into the FileRewriter.
178180
w.indexEncoder.AddVirtualBlockMapping(vblockID, int(w.stats.BlockCount),
179181
BlockValueID(w.valuesEncoder.Count()))
180182
}
@@ -195,29 +197,19 @@ func (w *FileWriter) EstimatedSize() uint64 {
195197
return sz
196198
}
197199

198-
// ForceFlush flushes the current block to the write queue. Writers should
199-
// generally not call ForceFlush, and instead let the heuristics configured
200+
// FlushForTesting flushes the current block to the write queue. Writers should
201+
// generally not call FlushForTesting, and instead let the heuristics configured
200202
// through FileWriterOptions handle flushing.
201203
//
202-
// For blob file rewriting, ForceFlush needs to be called to ensure that there
203-
// is a 1:1 remapping of virtual blocks to physical blocks.
204-
//
205-
// Otherwise, it's exposed so that tests can force flushes to construct blob
206-
// files with arbitrary structures.
207-
func (w *FileWriter) ForceFlush() {
204+
// It's exposed so that tests can force flushes to construct blob files with
205+
// arbitrary structures.
206+
func (w *FileWriter) FlushForTesting() {
208207
if w.valuesEncoder.Count() == 0 {
209208
return
210209
}
211210
w.flush()
212211
}
213212

214-
// ShouldFlush returns true if the current block should be flushed before adding
215-
// newDataBytes of data.
216-
func (w *FileWriter) ShouldFlushBefore(newDataBytes int) bool {
217-
size := w.valuesEncoder.size() + block.TrailerLen
218-
return w.flushGov.ShouldFlush(size, size+newDataBytes)
219-
}
220-
221213
// flush flushes the current block to the write queue.
222214
func (w *FileWriter) flush() {
223215
if w.valuesEncoder.Count() == 0 {

sstable/blob/blob_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func TestBlobWriter(t *testing.T) {
5252
case l == "---flush---":
5353
w.flush()
5454
case l == "---add-vblock---":
55-
w.BeginNewVirtualBlock(BlockID(vBlockID))
55+
w.beginNewVirtualBlock(BlockID(vBlockID))
5656
vBlockID++
5757
default:
5858
h := w.AddValue([]byte(l))

sstable/blob/rewrite.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2+
// of this source code is governed by a BSD-style license that can be found in
3+
// the LICENSE file.
4+
5+
package blob
6+
7+
import (
8+
"context"
9+
"slices"
10+
11+
"github.com/cockroachdb/errors"
12+
"github.com/cockroachdb/pebble/internal/base"
13+
"github.com/cockroachdb/pebble/objstorage"
14+
"github.com/cockroachdb/pebble/sstable/block"
15+
)
16+
17+
// A FileRewriter copies values from an input blob file, outputting a new blob
18+
// file containing a subset of the original blob file's values. The original
19+
// Handles used to access values in the original blob file will continue to work
20+
// with the new blob file, as long as the value was copied during rewrite.
21+
type FileRewriter struct {
22+
fileID base.BlobFileID
23+
w *FileWriter
24+
f ValueFetcher
25+
}
26+
27+
// NewFileRewriter creates a new FileRewriter that will copy values from the
28+
// input blob file to the output blob file.
29+
func NewFileRewriter(
30+
fileID base.BlobFileID,
31+
inputFileNum base.DiskFileNum,
32+
rp ReaderProvider,
33+
readEnv block.ReadEnv,
34+
outputFileNum base.DiskFileNum,
35+
w objstorage.Writable,
36+
opts FileWriterOptions,
37+
) *FileRewriter {
38+
rw := &FileRewriter{
39+
fileID: fileID,
40+
w: NewFileWriter(outputFileNum, w, opts),
41+
}
42+
rw.f.Init(inputFileMapping(inputFileNum), rp, readEnv)
43+
return rw
44+
}
45+
46+
// CopyBlock copies the values for the given blockID to the output blob file.
47+
// CopyBlock must be called with ascending blockIDs. The totalValueSize must be
48+
// the size of all the values indicated by valueIDs.
49+
func (rw *FileRewriter) CopyBlock(
50+
ctx context.Context, blockID BlockID, totalValueSize int, valueIDs []int,
51+
) error {
52+
slices.Sort(valueIDs)
53+
54+
// Consider whether we should flush the current physical block. We know
55+
// we'll need to add totalValueSize worth of value data, and can make a
56+
// decision up front. All values from the same original blockID must be
57+
// located in the same physical block.
58+
valuesInBlock := rw.w.valuesEncoder.Count()
59+
if valuesInBlock > 0 {
60+
currentBlockSize := rw.w.valuesEncoder.size() + block.TrailerLen
61+
if rw.w.flushGov.ShouldFlush(currentBlockSize, currentBlockSize+totalValueSize) {
62+
rw.w.flush()
63+
}
64+
}
65+
66+
// Record the mapping from the virtual block ID to the current physical
67+
// block and offset within the block.
68+
rw.w.beginNewVirtualBlock(blockID)
69+
70+
previousValueID := -1
71+
for _, valueID := range valueIDs {
72+
// If there is a gap in the referenced Value IDs within this block, we
73+
// need to represent this sparseness as empty values within the block.
74+
// We can represent sparseness at the tail of a block or between blocks
75+
// more compactly, but not sparseless at the beginning of a virtual
76+
// block. See the doc.go comment for more details on sparseness.
77+
for missingValueID := previousValueID + 1; missingValueID < valueID; missingValueID++ {
78+
rw.w.stats.ValueCount++
79+
rw.w.valuesEncoder.AddValue(nil)
80+
}
81+
82+
// Retrieve the value and copy it to the output blob file.
83+
value, _, err := rw.f.Fetch(ctx, rw.fileID, blockID, BlockValueID(valueID))
84+
if err != nil {
85+
return err
86+
}
87+
rw.w.stats.ValueCount++
88+
rw.w.stats.UncompressedValueBytes += uint64(len(value))
89+
rw.w.valuesEncoder.AddValue(value)
90+
previousValueID = valueID
91+
}
92+
return nil
93+
}
94+
95+
// Close finishes writing the output blob file and releases resources.
96+
func (rw *FileRewriter) Close() (FileWriterStats, error) {
97+
stats, err := rw.w.Close()
98+
return stats, errors.CombineErrors(err, rw.f.Close())
99+
}
100+
101+
// inputFileMapping implements blob.FileMapping and always maps to itself.
102+
type inputFileMapping base.DiskFileNum
103+
104+
// Assert that (*inputFileMapping) implements blob.FileMapping.
105+
var _ FileMapping = inputFileMapping(0)
106+
107+
func (m inputFileMapping) Lookup(fileID base.BlobFileID) (base.DiskFileNum, bool) {
108+
return base.DiskFileNum(m), true
109+
}

0 commit comments

Comments
 (0)