Skip to content

Commit 243cff4

Browse files
committed
db: add TestBlobRewriteRandomized
Add a randomized test of blob file rewrites. TestBlobRewriteRandomized tests blob file rewriting by constructing a blob file and n sstables that each reference one value in the blob file. It then runs a blob rewrite repeatedly, passing in a random subset of the sstables as extant references. Each blob rewrite may rewrite the original blob file, or one of the previous iteration's rewritten blob files.
1 parent 28a5f11 commit 243cff4

File tree

1 file changed

+212
-0
lines changed

1 file changed

+212
-0
lines changed

blob_rewrite_test.go

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"bytes"
99
"context"
1010
"fmt"
11+
"math/rand/v2"
12+
"slices"
1113
"strconv"
1214
"strings"
1315
"testing"
@@ -17,9 +19,12 @@ import (
1719
"github.com/cockroachdb/datadriven"
1820
"github.com/cockroachdb/pebble/internal/base"
1921
"github.com/cockroachdb/pebble/internal/blobtest"
22+
"github.com/cockroachdb/pebble/internal/bytealloc"
23+
"github.com/cockroachdb/pebble/internal/cache"
2024
"github.com/cockroachdb/pebble/internal/compact"
2125
"github.com/cockroachdb/pebble/internal/manifest"
2226
"github.com/cockroachdb/pebble/internal/testkeys"
27+
"github.com/cockroachdb/pebble/internal/testutils"
2328
"github.com/cockroachdb/pebble/objstorage"
2429
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
2530
"github.com/cockroachdb/pebble/sstable"
@@ -212,3 +217,210 @@ func TestBlobRewrite(t *testing.T) {
212217
panic("unreachable")
213218
})
214219
}
220+
221+
// TestBlobRewriteRandomized tests blob file rewriting by constructing a blob
222+
// file and n sstables that each reference one value in the blob file.
223+
//
224+
// It then runs a blob rewrite repeatedly, passing in a random subset of the
225+
// sstables as extant references. Each blob rewrite may rewrite the original
226+
// blob file, or one of the previous iteration's rewritten blob files.
227+
func TestBlobRewriteRandomized(t *testing.T) {
228+
const numKVs = 1000
229+
const blobFileID = 100000
230+
const numRewrites = 10
231+
232+
seed := time.Now().UnixNano()
233+
t.Logf("seed: %d", seed)
234+
rng := rand.New(rand.NewPCG(0, uint64(seed)))
235+
236+
// Generate keys and values.
237+
keys, values := func() (keys, values [][]byte) {
238+
keys = make([][]byte, numKVs)
239+
values = make([][]byte, numKVs)
240+
var alloc bytealloc.A
241+
for i := 0; i < numKVs; i++ {
242+
var key, value []byte
243+
key, alloc = alloc.Copy([]byte(fmt.Sprintf("%06d", i)))
244+
value, alloc = alloc.Copy(append([]byte(fmt.Sprintf("%d", i)),
245+
bytes.Repeat([]byte{'v'}, rng.IntN(100))...))
246+
keys[i] = key
247+
values[i] = value
248+
}
249+
return keys, values
250+
}()
251+
252+
ctx := context.Background()
253+
objStore, err := objstorageprovider.Open(objstorageprovider.Settings{
254+
FS: vfs.NewMem(),
255+
})
256+
require.NoError(t, err)
257+
258+
// Write the source blob file.
259+
var originalBlobFile manifest.BlobFileMetadata
260+
handles := make([]blob.Handle, numKVs)
261+
{
262+
w, _, err := objStore.Create(ctx, base.FileTypeBlob, base.DiskFileNum(blobFileID), objstorage.CreateOptions{})
263+
require.NoError(t, err)
264+
blobWriter := blob.NewFileWriter(base.DiskFileNum(blobFileID), w, blob.FileWriterOptions{})
265+
for i := range numKVs {
266+
if rng.IntN(20) == 0 {
267+
blobWriter.FlushForTesting()
268+
}
269+
handles[i] = blobWriter.AddValue(values[i])
270+
}
271+
stats, err := blobWriter.Close()
272+
require.NoError(t, err)
273+
require.Equal(t, numKVs, int(stats.ValueCount))
274+
originalBlobFile = manifest.BlobFileMetadata{
275+
FileID: base.BlobFileID(blobFileID),
276+
Physical: &manifest.PhysicalBlobFile{
277+
FileNum: base.DiskFileNum(blobFileID),
278+
CreationTime: uint64(time.Now().Unix()),
279+
ValueSize: stats.UncompressedValueBytes,
280+
Size: stats.FileLen,
281+
},
282+
}
283+
}
284+
285+
// Write numKVs SSTables, each with 1 kv pair, all referencing the same blob
286+
// file.
287+
originalTables := make([]*manifest.TableMetadata, numKVs)
288+
originalValueIndices := make([]int, numKVs)
289+
for i := range numKVs {
290+
w, _, err := objStore.Create(ctx, base.FileTypeTable, base.DiskFileNum(i), objstorage.CreateOptions{})
291+
require.NoError(t, err)
292+
tw := sstable.NewRawWriter(w, sstable.WriterOptions{
293+
TableFormat: sstable.TableFormatMax,
294+
})
295+
require.NoError(t, tw.AddWithBlobHandle(
296+
base.MakeInternalKey(keys[i], base.SeqNum(i), base.InternalKeyKindSet),
297+
blob.InlineHandle{
298+
InlineHandlePreface: blob.InlineHandlePreface{
299+
ReferenceID: blob.ReferenceID(0),
300+
ValueLen: uint32(len(values[i])),
301+
},
302+
HandleSuffix: blob.HandleSuffix{
303+
BlockID: handles[i].BlockID,
304+
ValueID: handles[i].ValueID,
305+
},
306+
},
307+
base.ShortAttribute(0),
308+
false, /* forceObsolete */
309+
))
310+
require.NoError(t, tw.Close())
311+
originalValueIndices[i] = i
312+
originalTables[i] = &manifest.TableMetadata{
313+
TableNum: base.TableNum(i),
314+
TableBacking: &manifest.TableBacking{
315+
DiskFileNum: base.DiskFileNum(i),
316+
},
317+
BlobReferences: []manifest.BlobReference{
318+
{FileID: base.BlobFileID(blobFileID), ValueSize: uint64(len(values[i]))},
319+
},
320+
}
321+
}
322+
323+
fc := NewFileCache(1, 100)
324+
defer fc.Unref()
325+
c := cache.New(100)
326+
defer c.Unref()
327+
ch := c.NewHandle()
328+
defer ch.Close()
329+
fch := fc.newHandle(ch, objStore, base.NoopLoggerAndTracer{}, sstable.ReaderOptions{}, nil)
330+
defer fch.Close()
331+
var bufferPool block.BufferPool
332+
bufferPool.Init(4)
333+
defer bufferPool.Release()
334+
readEnv := block.ReadEnv{BufferPool: &bufferPool}
335+
336+
type sourceFile struct {
337+
metadata manifest.BlobFileMetadata
338+
valueIndices []int
339+
referencingTables []*manifest.TableMetadata
340+
}
341+
files := []sourceFile{{
342+
metadata: originalBlobFile,
343+
valueIndices: originalValueIndices,
344+
referencingTables: originalTables,
345+
}}
346+
347+
for i := range numRewrites {
348+
fileIdx := rng.IntN(len(files))
349+
fileToRewrite := files[fileIdx]
350+
351+
// Rewrite the blob file.
352+
newBlobFileNum := base.DiskFileNum(blobFileID + i + 1)
353+
354+
// Pick a random subset of the referencing tables to use as remaining
355+
// extant references.
356+
n := 1
357+
if len(fileToRewrite.valueIndices) > 1 {
358+
n = testutils.RandIntInRange(rng, 1, len(fileToRewrite.valueIndices))
359+
}
360+
t.Logf("rewriting file %s, preserving %d values", fileToRewrite.metadata.Physical.FileNum, n)
361+
362+
// Produce the inputs for the rewrite.
363+
newFile := sourceFile{
364+
metadata: manifest.BlobFileMetadata{
365+
FileID: base.BlobFileID(blobFileID),
366+
Physical: &manifest.PhysicalBlobFile{
367+
FileNum: newBlobFileNum,
368+
},
369+
},
370+
valueIndices: make([]int, n),
371+
referencingTables: make([]*manifest.TableMetadata, n),
372+
}
373+
for k, j := range rng.Perm(len(fileToRewrite.valueIndices))[:n] {
374+
valueIndex := fileToRewrite.valueIndices[j]
375+
newFile.valueIndices[k] = valueIndex
376+
newFile.referencingTables[k] = originalTables[valueIndex]
377+
}
378+
slices.Sort(newFile.valueIndices)
379+
for _, idx := range newFile.valueIndices {
380+
t.Logf("newFile.valueIndices: %d: %q; handle: %s", idx, values[idx], handles[idx])
381+
}
382+
383+
// Rewrite the blob file.
384+
w, _, err := objStore.Create(ctx, base.FileTypeBlob, newBlobFileNum, objstorage.CreateOptions{})
385+
require.NoError(t, err)
386+
opts := blob.FileWriterOptions{
387+
FlushGovernor: block.MakeFlushGovernor(128<<rng.IntN(6), 90, 100, nil),
388+
}
389+
rewriter := newBlobFileRewriter(fch, readEnv, newBlobFileNum,
390+
w, opts, newFile.referencingTables, fileToRewrite.metadata)
391+
stats, err := rewriter.Rewrite(ctx)
392+
require.NoError(t, err)
393+
require.LessOrEqual(t, n, int(stats.ValueCount))
394+
newFile.metadata.Physical.ValueSize = stats.UncompressedValueBytes
395+
newFile.metadata.Physical.Size = stats.FileLen
396+
397+
// Verify that the rewritten blob file contains the correct values, and
398+
// that they may still be accessed using the original handles.
399+
var valueFetcher blob.ValueFetcher
400+
valueFetcher.Init(constantFileMapping(newBlobFileNum), fch, readEnv)
401+
func() {
402+
defer func() { _ = valueFetcher.Close() }()
403+
for _, valueIndex := range newFile.valueIndices {
404+
handle := handles[valueIndex]
405+
val, _, err := valueFetcher.Fetch(ctx, blobFileID, handle.BlockID, handle.ValueID)
406+
require.NoError(t, err)
407+
require.Equal(t, values[valueIndex], val)
408+
}
409+
}()
410+
411+
// Add the new blob file to the list of blob files so that a future
412+
// rewrite can use it as the source file. This ensures we test rewrites
413+
// of rewritten blob files.
414+
files = append(files, newFile)
415+
}
416+
}
417+
418+
// constantFileMapping implements blob.FileMapping and always maps to itself.
419+
type constantFileMapping base.DiskFileNum
420+
421+
// Assert that (*inputFileMapping) implements blob.FileMapping.
422+
var _ blob.FileMapping = constantFileMapping(0)
423+
424+
func (m constantFileMapping) Lookup(fileID base.BlobFileID) (base.DiskFileNum, bool) {
425+
return base.DiskFileNum(m), true
426+
}

0 commit comments

Comments
 (0)