Skip to content

Commit bb0e2d1

Browse files
committed
db: add logic for blob file rewriting
This patch adds logic for rewriting a blob file, given referencing sstables, by `OR`ing together all referencing sstables' bitmaps to avoid writing never-referenced values.
1 parent ff0e2a6 commit bb0e2d1

File tree

9 files changed

+556
-11
lines changed

9 files changed

+556
-11
lines changed

blob_rewrite.go

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2+
// of this source code is governed by a BSD-style license that can be found in
3+
// the LICENSE file.
4+
5+
package pebble
6+
7+
import (
8+
"container/heap"
9+
"context"
10+
"slices"
11+
12+
"github.com/cockroachdb/errors"
13+
"github.com/cockroachdb/pebble/internal/base"
14+
"github.com/cockroachdb/pebble/internal/manifest"
15+
"github.com/cockroachdb/pebble/sstable"
16+
"github.com/cockroachdb/pebble/sstable/blob"
17+
"github.com/cockroachdb/pebble/sstable/block"
18+
"github.com/cockroachdb/pebble/sstable/colblk"
19+
)
20+
21+
// blockHeap is a min-heap of blob reference liveness encodings, ordered by
22+
// blockID. We use this to help us determine the overall liveness of values in
23+
// each blob block by combining the blob reference liveness encodings of all
24+
// referencing sstables for a particular blockID.
25+
type blockHeap []*sstable.BlobRefLivenessEncoding
26+
27+
// Len implements sort.Interface.
28+
func (h blockHeap) Len() int { return len(h) }
29+
30+
// Less implements sort.Interface.
31+
func (h blockHeap) Less(i, j int) bool { return h[i].BlockID < h[j].BlockID }
32+
33+
// Swap implements sort.Interface.
34+
func (h blockHeap) Swap(i, j int) {
35+
h[i], h[j] = h[j], h[i]
36+
}
37+
38+
// Push implements heap.Interface.
39+
func (h *blockHeap) Push(x any) {
40+
blobEnc := x.(*sstable.BlobRefLivenessEncoding)
41+
*h = append(*h, blobEnc)
42+
}
43+
44+
// Pop implements heap.Interface.
45+
func (h *blockHeap) Pop() any {
46+
old := *h
47+
n := len(old)
48+
item := old[n-1]
49+
old[n-1] = nil
50+
*h = old[0 : n-1]
51+
return item
52+
}
53+
54+
// accumulatedBlockData holds the accumulated liveness data for blockID.
55+
type accumulatedBlockData struct {
56+
blockID blob.BlockID
57+
valuesSize int
58+
liveValueIDs []int
59+
}
60+
61+
// blobFileMapping implements blob.FileMapping to always map to the input blob
62+
// file.
63+
type blobFileMapping struct {
64+
fileNum base.DiskFileNum
65+
}
66+
67+
// Assert that (*blobFileMapping) implements blob.FileMapping.
68+
var _ blob.FileMapping = (*blobFileMapping)(nil)
69+
70+
func (m *blobFileMapping) Lookup(fileID base.BlobFileID) (base.DiskFileNum, bool) {
71+
return m.fileNum, true
72+
}
73+
74+
// blobFileRewriter is responsible for rewriting blob files by combining and
75+
// processing blob reference liveness encodings from multiple SSTables. It
76+
// maintains state for writing to an output blob file.
77+
type blobFileRewriter struct {
78+
fc *fileCacheHandle
79+
env block.ReadEnv
80+
sstables []*manifest.TableMetadata
81+
82+
inputBlob manifest.BlobFileMetadata
83+
valueFetcher blob.ValueFetcher
84+
fileMapping blobFileMapping
85+
blkHeap blockHeap
86+
87+
// Current blob writer state.
88+
writer *blob.FileWriter
89+
}
90+
91+
func newBlobFileRewriter(
92+
fc *fileCacheHandle,
93+
env block.ReadEnv,
94+
writer *blob.FileWriter,
95+
sstables []*manifest.TableMetadata,
96+
inputBlob manifest.BlobFileMetadata,
97+
) *blobFileRewriter {
98+
return &blobFileRewriter{
99+
fc: fc,
100+
env: env,
101+
writer: writer,
102+
sstables: sstables,
103+
inputBlob: inputBlob,
104+
fileMapping: blobFileMapping{fileNum: inputBlob.Physical.FileNum},
105+
blkHeap: blockHeap{},
106+
}
107+
}
108+
109+
// generateHeap populates rw.blkHeap with the blob reference liveness encodings
110+
// for each referencing sstable, rw.sstables.
111+
func (rw *blobFileRewriter) generateHeap() error {
112+
ctx := context.TODO()
113+
heap.Init(&rw.blkHeap)
114+
115+
var decoder colblk.ReferenceLivenessBlockDecoder
116+
// For each sstable that references the input blob file, push its
117+
// sstable.BlobLivenessEncoding on to the heap.
118+
for _, sst := range rw.sstables {
119+
// Validate that the sstable contains a reference to the input blob
120+
// file.
121+
refID, ok := sst.BlobReferences.IDByBlobFileID(rw.inputBlob.FileID)
122+
if !ok {
123+
return errors.AssertionFailedf("table %s doesn't contain a reference to blob file %s",
124+
sst.TableNum, rw.inputBlob.FileID)
125+
}
126+
err := rw.fc.withReader(ctx, rw.env, sst, func(r *sstable.Reader, _ sstable.ReadEnv) error {
127+
h, err := r.ReadBlobRefIndexBlock(ctx, rw.env)
128+
if err != nil {
129+
return errors.CombineErrors(err, r.Close())
130+
}
131+
decoder.Init(h.BlockData())
132+
bitmapEncodings := slices.Clone(decoder.LivenessAtReference(int(refID)))
133+
h.Release()
134+
// TODO(annie): We should instead maintain 1 heap item per sstable
135+
// instead of 1 heap item per sstable block ref to reduce the heap
136+
// comparisons to O(sstables).
137+
for _, enc := range sstable.DecodeBlobRefLivenessEncoding(bitmapEncodings) {
138+
heap.Push(&rw.blkHeap, &enc)
139+
}
140+
return r.Close()
141+
})
142+
if err != nil {
143+
return err
144+
}
145+
}
146+
return nil
147+
}
148+
149+
// copyBlockValues copies the live values from the given block to the output
150+
// blob file, flushing the current block before if necessary. If a flush is
151+
// performed, it returns true to signal the callee that we have started on a
152+
// new block.
153+
func (rw *blobFileRewriter) copyBlockValues(
154+
ctx context.Context, toWrite *accumulatedBlockData, currentBlockSize int,
155+
) (bool, error) {
156+
shouldFlush := rw.writer.ShouldFlush(currentBlockSize, currentBlockSize+toWrite.valuesSize)
157+
if shouldFlush {
158+
rw.writer.ForceFlush()
159+
}
160+
slices.Sort(toWrite.liveValueIDs)
161+
for _, valueID := range toWrite.liveValueIDs {
162+
value, _, err := rw.valueFetcher.Fetch(ctx, rw.inputBlob.FileID, toWrite.blockID, blob.BlockValueID(valueID))
163+
if err != nil {
164+
return shouldFlush, err
165+
}
166+
rw.writer.AddValue(value)
167+
}
168+
return shouldFlush, nil
169+
}
170+
171+
func (rw *blobFileRewriter) Rewrite() error {
172+
ctx := context.TODO()
173+
174+
rw.valueFetcher.Init(&rw.fileMapping, rw.fc, rw.env)
175+
defer func() { _ = rw.valueFetcher.Close() }()
176+
177+
err := rw.generateHeap()
178+
if err != nil {
179+
return err
180+
}
181+
182+
// Begin constructing our output blob file. We maintain a map of blockID
183+
// to accumulated liveness data across all referencing sstables.
184+
var lastAccEncoding *accumulatedBlockData
185+
blockSize := 0
186+
for rw.blkHeap.Len() > 0 {
187+
currBlock := heap.Pop(&rw.blkHeap).(*sstable.BlobRefLivenessEncoding)
188+
189+
// Initialize the last accumulated block if nil.
190+
if lastAccEncoding == nil {
191+
lastAccEncoding = &accumulatedBlockData{
192+
blockID: currBlock.BlockID,
193+
valuesSize: currBlock.ValuesSize,
194+
liveValueIDs: slices.Collect(sstable.IterSetBitsInRunLengthBitmap(currBlock.Bitmap)),
195+
}
196+
}
197+
// If we are encountering a new block, write the last accumulated block to
198+
// the blob file.
199+
if lastAccEncoding.blockID != currBlock.BlockID {
200+
// Add virtual block mappings for all blocks between the last block
201+
// we encountered and the current block.
202+
for blockID := lastAccEncoding.blockID; blockID < currBlock.BlockID; blockID++ {
203+
rw.writer.BeginNewVirtualBlock(blockID)
204+
}
205+
// Write the last accumulated block's values to the blob file.
206+
if flushed, err := rw.copyBlockValues(ctx, lastAccEncoding, blockSize); err != nil {
207+
return err
208+
} else if flushed {
209+
blockSize = 0
210+
} else {
211+
blockSize += lastAccEncoding.valuesSize
212+
}
213+
}
214+
215+
// Update the accumulated encoding for this block.
216+
lastAccEncoding.valuesSize += currBlock.ValuesSize
217+
lastAccEncoding.liveValueIDs = slices.AppendSeq(lastAccEncoding.liveValueIDs,
218+
sstable.IterSetBitsInRunLengthBitmap(currBlock.Bitmap))
219+
}
220+
221+
// Copy the last accumulated block.
222+
rw.writer.BeginNewVirtualBlock(lastAccEncoding.blockID)
223+
if _, err := rw.copyBlockValues(ctx, lastAccEncoding, blockSize); err != nil {
224+
return err
225+
}
226+
227+
_, err = rw.writer.Close()
228+
if err != nil {
229+
return err
230+
}
231+
return nil
232+
}

0 commit comments

Comments
 (0)