Skip to content

Commit e2a0f83

Browse files
committed
replay: adding ingestion testing
this commit adds ingestion tests on replay package for improving benchmarks and removing any mismatch between workload and replay database
1 parent 38de796 commit e2a0f83

File tree

8 files changed

+436
-4
lines changed

8 files changed

+436
-4
lines changed

compaction.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1593,6 +1593,7 @@ func (d *DB) runIngestFlush(c *tableCompaction) (*manifest.VersionEdit, error) {
15931593
Bounds: exciseBounds,
15941594
SeqNum: ingestFlushable.exciseSeqNum,
15951595
})
1596+
d.mu.versions.metrics.Ingest.ExciseIngestCount++
15961597
}
15971598
// Iterate through all levels and find files that intersect with exciseSpan.
15981599
for layer, ls := range version.AllLevelsAndSublevels() {

ingest.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2082,6 +2082,7 @@ func (d *DB) ingestApply(
20822082
var exciseBounds base.UserKeyBounds
20832083
if exciseSpan.Valid() {
20842084
exciseBounds = exciseSpan.UserKeyBounds()
2085+
d.mu.versions.metrics.Ingest.ExciseIngestCount++
20852086
// Iterate through all levels and find files that intersect with exciseSpan.
20862087
//
20872088
// TODO(bilal): We could drop the DB mutex here as we don't need it for

internal/datatest/datatest.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@
77
package datatest
88

99
import (
10+
"context"
1011
"strings"
1112
"sync"
1213

1314
"github.com/cockroachdb/datadriven"
1415
"github.com/cockroachdb/errors"
1516
"github.com/cockroachdb/pebble"
17+
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
18+
"github.com/cockroachdb/pebble/sstable"
19+
"github.com/cockroachdb/pebble/vfs"
1620
)
1721

1822
// TODO(jackson): Consider a refactoring that can consolidate this package and
@@ -138,3 +142,76 @@ func (cql *CompactionTracker) WaitForInflightCompactionsToEqual(target int) {
138142
}
139143
cql.L.Unlock()
140144
}
145+
146+
// Below functions are more or less replica from data_test.go pebble package
147+
func RunBuildSSTCmd(
148+
input string,
149+
writerArgs []datadriven.CmdArg,
150+
path string,
151+
fs vfs.FS,
152+
opts ...func(*dataDrivenCmdOptions),
153+
) (sstable.WriterMetadata, error) {
154+
ddOpts := combineDataDrivenOpts(opts...)
155+
156+
writerOpts := ddOpts.defaultWriterOpts
157+
if err := sstable.ParseWriterOptions(&writerOpts, writerArgs...); err != nil {
158+
return sstable.WriterMetadata{}, err
159+
}
160+
161+
f, err := fs.Create(path, vfs.WriteCategoryUnspecified)
162+
if err != nil {
163+
return sstable.WriterMetadata{}, err
164+
}
165+
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writerOpts)
166+
if err := sstable.ParseTestSST(w.Raw(), input, nil /* bv */); err != nil {
167+
return sstable.WriterMetadata{}, err
168+
}
169+
if err := w.Close(); err != nil {
170+
return sstable.WriterMetadata{}, err
171+
}
172+
metadata, err := w.Metadata()
173+
if err != nil {
174+
return sstable.WriterMetadata{}, err
175+
}
176+
return *metadata, nil
177+
}
178+
179+
func combineDataDrivenOpts(opts ...func(*dataDrivenCmdOptions)) dataDrivenCmdOptions {
180+
combined := dataDrivenCmdOptions{}
181+
for _, opt := range opts {
182+
opt(&combined)
183+
}
184+
return combined
185+
}
186+
187+
type dataDrivenCmdOptions struct {
188+
defaultWriterOpts sstable.WriterOptions
189+
}
190+
191+
func WithDefaultWriterOpts(defaultWriterOpts sstable.WriterOptions) func(*dataDrivenCmdOptions) {
192+
return func(o *dataDrivenCmdOptions) { o.defaultWriterOpts = defaultWriterOpts }
193+
}
194+
195+
func RunIngestAndExciseCmd(td *datadriven.TestData, d *pebble.DB) error {
196+
paths := make([]string, 0)
197+
var exciseSpan pebble.KeyRange
198+
for i := range td.CmdArgs {
199+
if strings.HasSuffix(td.CmdArgs[i].Key, ".sst") {
200+
paths = append(paths, td.CmdArgs[i].Key)
201+
} else if td.CmdArgs[i].Key == "excise" {
202+
if len(td.CmdArgs[i].Vals) != 1 {
203+
return errors.New("expected 2 values for excise separated by -, eg. ingest-and-excise foo1 excise=\"start-end\"")
204+
}
205+
fields := strings.Split(td.CmdArgs[i].Vals[0], "-")
206+
if len(fields) != 2 {
207+
return errors.New("expected 2 values for excise separated by -, eg. ingest-and-excise foo1 excise=\"start-end\"")
208+
}
209+
exciseSpan.Start = []byte(fields[0])
210+
exciseSpan.End = []byte(fields[1])
211+
}
212+
}
213+
if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan); err != nil {
214+
return err
215+
}
216+
return nil
217+
}

metrics.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,8 @@ type Metrics struct {
258258
Ingest struct {
259259
// The total number of ingestions
260260
Count uint64
261+
// The number of excise operations during ingestion
262+
ExciseIngestCount int64
261263
}
262264

263265
Flush struct {

replay/replay.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ type Metrics struct {
118118
// effective heuristics are at ingesting files into lower levels, saving
119119
// write amplification.
120120
BytesWeightedByLevel uint64
121+
ExciseIngestCount int64
121122
}
122123
// PaceDuration is the time waiting for the pacer to allow the workload to
123124
// continue.
@@ -204,6 +205,9 @@ func (m *Metrics) WriteBenchmarkString(name string, w io.Writer) error {
204205
{label: "EstimatedDebt/max", values: []benchfmt.Value{
205206
{Value: float64(m.EstimatedDebt.Max()), Unit: "bytes"},
206207
}},
208+
{label: "ExciseDuringIngestion", values: []benchfmt.Value{
209+
{Value: float64(m.Ingest.ExciseIngestCount), Unit: "excise"},
210+
}},
207211
{label: "FlushUtilization", values: []benchfmt.Value{
208212
{Value: m.Final.Flush.WriteThroughput.Utilization(), Unit: "util"},
209213
}},
@@ -563,6 +567,7 @@ func (r *Runner) Wait() (Metrics, error) {
563567
m.CompactionCounts.Rewrite = pm.Compact.RewriteCount
564568
m.CompactionCounts.Copy = pm.Compact.CopyCount
565569
m.CompactionCounts.MultiLevel = pm.Compact.MultiLevelCount
570+
m.Ingest.ExciseIngestCount = pm.Ingest.ExciseIngestCount
566571
m.Ingest.BytesIntoL0 = pm.Levels[0].TableBytesIngested
567572
m.Ingest.BytesWeightedByLevel = ingestBytesWeighted
568573
return m, err
@@ -584,8 +589,10 @@ type workloadStep struct {
584589
// readAmp estimation for the LSM *before* ve was applied.
585590
previousReadAmp int
586591
// non-nil for flushStepKind
587-
flushBatch *pebble.Batch
588-
tablesToIngest []string
592+
flushBatch *pebble.Batch
593+
tablesToIngest []string
594+
// exciseSpan is set for ingestAndExciseStepKind
595+
exciseSpan pebble.KeyRange
589596
cumulativeWriteBytes uint64
590597
}
591598

@@ -595,6 +602,7 @@ const (
595602
flushStepKind stepKind = iota
596603
ingestStepKind
597604
compactionStepKind
605+
ingestAndExciseStepKind
598606
)
599607

600608
// eventListener returns a Pebble EventListener that is installed on the replay
@@ -692,6 +700,12 @@ func (r *Runner) applyWorkloadSteps(ctx context.Context) error {
692700
}
693701
r.metrics.writeBytes.Store(step.cumulativeWriteBytes)
694702
r.stepsApplied <- step
703+
case ingestAndExciseStepKind:
704+
if _, err := r.d.IngestAndExcise(context.Background(), step.tablesToIngest, nil /* shared */, nil /* external */, step.exciseSpan); err != nil {
705+
return err
706+
}
707+
r.metrics.writeBytes.Store(step.cumulativeWriteBytes)
708+
r.stepsApplied <- step
695709
case compactionStepKind:
696710
// No-op.
697711
// TODO(jackson): Should we elide this earlier?
@@ -795,12 +809,22 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
795809
// flush.
796810
s.kind = ingestStepKind
797811
}
812+
if len(ve.ExciseBoundsRecord) > 0 {
813+
// If a version edit contains excise bounds records, it's an excise operation.
814+
// In practice, there should typically be only one excise bounds record per version edit.
815+
exciseEntry := ve.ExciseBoundsRecord[0]
816+
s.exciseSpan = pebble.KeyRange{
817+
Start: exciseEntry.Bounds.Start,
818+
End: exciseEntry.Bounds.End.Key,
819+
}
820+
s.kind = ingestAndExciseStepKind
821+
}
798822
var newFiles []base.DiskFileNum
799823
blobRefMap := make(map[base.DiskFileNum]manifest.BlobReferences)
800824
blobFileMap := make(map[base.BlobFileID]base.DiskFileNum)
801825
for _, nf := range ve.NewTables {
802826
newFiles = append(newFiles, nf.Meta.TableBacking.DiskFileNum)
803-
if s.kind == ingestStepKind && (nf.Meta.SmallestSeqNum != nf.Meta.LargestSeqNum || nf.Level != 0) {
827+
if s.kind == ingestStepKind && (nf.Meta.SmallestSeqNum != nf.Meta.LargestSeqNum) {
804828
s.kind = flushStepKind
805829
}
806830
if nf.Meta.BlobReferenceDepth > 0 {
@@ -870,7 +894,7 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
870894
return errors.Wrapf(err, "flush in %q at offset %d", manifestName, rr.Offset())
871895
}
872896
cumulativeWriteBytes += uint64(s.flushBatch.Len())
873-
case ingestStepKind:
897+
case ingestStepKind, ingestAndExciseStepKind:
874898
// Copy the ingested sstables into a staging area within the
875899
// run dir. This is necessary for two reasons:
876900
// a) Ingest will remove the source file, and we don't want

replay/replay_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cockroachdb/pebble/internal/invariants"
2828
"github.com/cockroachdb/pebble/internal/testkeys"
2929
"github.com/cockroachdb/pebble/rangekey"
30+
"github.com/cockroachdb/pebble/sstable"
3031
"github.com/cockroachdb/pebble/vfs"
3132
"github.com/stretchr/testify/require"
3233
)
@@ -166,6 +167,10 @@ func TestReplayValSep(t *testing.T) {
166167
runReplayTest(t, "testdata/replay_val_sep")
167168
}
168169

170+
func TestReplayIngest(t *testing.T) {
171+
runReplayTest(t, "testdata/replay_ingest")
172+
}
173+
169174
func TestLoadFlushedSSTableKeys(t *testing.T) {
170175
var buf bytes.Buffer
171176
var diskFileNums []base.DiskFileNum
@@ -283,6 +288,34 @@ func collectCorpus(t *testing.T, fs *vfs.MemFS, name string) {
283288
return err.Error()
284289
}
285290
return ""
291+
case "build-sst":
292+
writerOpts := sstable.WriterOptions{
293+
Comparer: testkeys.Comparer,
294+
}
295+
sstPath := td.CmdArgs[0].Key
296+
writerOpts.TableFormat = sstable.TableFormatPebblev7
297+
298+
_, err := datatest.RunBuildSSTCmd(td.Input, td.CmdArgs, sstPath, fs, datatest.WithDefaultWriterOpts(writerOpts))
299+
if err != nil {
300+
return err.Error()
301+
}
302+
return ""
303+
case "ingest":
304+
paths := make([]string, 0)
305+
for i := range td.CmdArgs {
306+
if strings.HasSuffix(td.CmdArgs[i].Key, ".sst") {
307+
paths = append(paths, td.CmdArgs[i].Key)
308+
}
309+
}
310+
if err := d.Ingest(context.Background(), paths); err != nil {
311+
return err.Error()
312+
}
313+
return "ingested"
314+
case "ingest-and-excise":
315+
if err := datatest.RunIngestAndExciseCmd(td, d); err != nil {
316+
return err.Error()
317+
}
318+
return "ingest-and-excised"
286319
case "flush":
287320
require.NoError(t, d.Flush())
288321
return ""
@@ -329,6 +362,21 @@ func collectCorpus(t *testing.T, fs *vfs.MemFS, name string) {
329362
d, err = pebble.Open("build", opts)
330363
require.NoError(t, err)
331364
return ""
365+
case "open-ingest-excise":
366+
wc = NewWorkloadCollector("build")
367+
opts := &pebble.Options{
368+
Comparer: testkeys.Comparer,
369+
DisableAutomaticCompactions: true,
370+
FormatMajorVersion: pebble.FormatExciseBoundsRecord,
371+
FS: fs,
372+
MaxManifestFileSize: 156,
373+
}
374+
setDefaultExperimentalOpts(opts)
375+
wc.Attach(opts)
376+
var err error
377+
d, err = pebble.Open("build", opts)
378+
require.NoError(t, err)
379+
return ""
332380
case "close":
333381
err := d.Close()
334382
require.NoError(t, err)
@@ -492,6 +540,7 @@ BenchmarkBenchmarkReplay/tpcc/DurationQuiescing 1 0.5 sec/op
492540
BenchmarkBenchmarkReplay/tpcc/DurationPaceDelay 1 0.25 sec/op
493541
BenchmarkBenchmarkReplay/tpcc/EstimatedDebt/mean 1 1.6777216e+08 bytes
494542
BenchmarkBenchmarkReplay/tpcc/EstimatedDebt/max 1 1.6777216e+08 bytes
543+
BenchmarkBenchmarkReplay/tpcc/ExciseDuringIngestion 1 0 excise
495544
BenchmarkBenchmarkReplay/tpcc/FlushUtilization 1 0 util
496545
BenchmarkBenchmarkReplay/tpcc/IngestedIntoL0 1 5.24288e+06 bytes
497546
BenchmarkBenchmarkReplay/tpcc/IngestWeightedByLevel 1 9.437184e+06 bytes

0 commit comments

Comments
 (0)