Skip to content

Commit 1c9b3f3

Browse files
committed
db, valsep: make span ValueStoragePolicy a struct defining the min size
ValueStoragePolicy in SpanPolicy is now a struct defining the policy adjustment type (defaultPolicy, noValSeparation, override) and the override value for the minimum size used for value separation. Compactions now use the ValueStoragePolicy returned by the span policy for a table's key range to decide whether we can preserve blob references or not. Previously, we would rewrite blob references if a compaction contained tables with mixed value storage policies, even if those policies were unchanged. We now preserve blob references if: - The maximum blob reference depth is not exceeded. - For each input table, the value separation policy used when writing the table matches the current value separation policy applied to the table span.
1 parent 1df7d4a commit 1c9b3f3

File tree

11 files changed

+356
-127
lines changed

11 files changed

+356
-127
lines changed

blob_rewrite_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,7 @@ func TestBlobRewrite(t *testing.T) {
9292
pbr := valsep.NewPreserveAllHotBlobReferences(
9393
inputBlobPhysicalFiles,
9494
0, /* outputBlobReferenceDepth */
95-
sstable.ValueSeparationDefault,
96-
0, /* original minimumSize */
95+
0, /* minimumSize */
9796
)
9897
vs = pbr
9998
case "write-new-blob-files":

compaction.go

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ type tableCompaction struct {
250250
// b) rewrite blob files: The compaction will write eligible values to new
251251
// blob files. This consumes more write bandwidth because all values are
252252
// rewritten. However it restores locality.
253-
getValueSeparation func(JobID, *tableCompaction, ValueStoragePolicy) valsep.ValueSeparation
253+
getValueSeparation func(JobID, *tableCompaction) valsep.ValueSeparation
254254

255255
// startLevel is the level that is being compacted. Inputs from startLevel
256256
// and outputLevel will be merged to produce a set of outputLevel files.
@@ -542,7 +542,7 @@ func (c *tableCompaction) makeInfo(jobID JobID) CompactionInfo {
542542
return info
543543
}
544544

545-
type getValueSeparation func(JobID, *tableCompaction, ValueStoragePolicy) valsep.ValueSeparation
545+
type getValueSeparation func(JobID, *tableCompaction) valsep.ValueSeparation
546546

547547
// newCompaction constructs a compaction from the provided picked compaction.
548548
//
@@ -3416,38 +3416,27 @@ func (d *DB) compactAndWrite(
34163416
}
34173417
runner := compact.NewRunner(runnerCfg, iter)
34183418

3419-
// Determine the value separation policy for this compaction.
3420-
// We pass the value storage span policy if one applies for the entire
3421-
// compaction keyspace in order to preserve blob references.
3422-
// Note that the value storage policy may change per table if
3423-
// there are different span policies in effect for this output
3424-
// range.
3425-
var compactionValueStoragePolicy ValueStoragePolicy
3426-
var spanPolicyEndKey []byte
3419+
var spanPolicyValid bool
34273420
var spanPolicy SpanPolicy
3428-
spanPolicy, spanPolicyEndKey, err = d.opts.Experimental.SpanPolicyFunc(c.bounds.Start)
3429-
if err != nil {
3430-
return runner.Finish().WithError(err)
3431-
}
3432-
if len(spanPolicyEndKey) == 0 || d.cmp(c.bounds.End.Key, spanPolicyEndKey) < 0 {
3433-
compactionValueStoragePolicy = spanPolicy.ValueStoragePolicy
3434-
}
3421+
// If spanPolicyValid is true and spanPolicyEndKey is empty, then spanPolicy
3422+
// applies for the rest of the keyspace.
3423+
var spanPolicyEndKey []byte
34353424

3436-
valueSeparation := c.getValueSeparation(jobID, c, compactionValueStoragePolicy)
3425+
valueSeparation := c.getValueSeparation(jobID, c)
34373426
for runner.MoreDataToWrite() {
34383427
if c.cancel.Load() {
34393428
return runner.Finish().WithError(ErrCancelledCompaction)
34403429
}
34413430
// Create a new table.
34423431
firstKey := runner.FirstKey()
3443-
if len(spanPolicyEndKey) > 0 && d.cmp(firstKey, spanPolicyEndKey) >= 0 {
3432+
if !spanPolicyValid || (len(spanPolicyEndKey) > 0 && d.cmp(firstKey, spanPolicyEndKey) >= 0) {
34443433
var err error
34453434
spanPolicy, spanPolicyEndKey, err = d.opts.Experimental.SpanPolicyFunc(firstKey)
34463435
if err != nil {
34473436
return runner.Finish().WithError(err)
34483437
}
3438+
spanPolicyValid = true
34493439
}
3450-
34513440
writerOpts := d.makeWriterOptions(c.outputLevel.level)
34523441
if spanPolicy.DisableValueSeparationBySuffix {
34533442
writerOpts.DisableValueBlocks = true
@@ -3456,14 +3445,16 @@ func (d *DB) compactAndWrite(
34563445
writerOpts.Compression = block.FastestCompression
34573446
}
34583447
vSep := valueSeparation
3459-
switch spanPolicy.ValueStoragePolicy {
3460-
case ValueStorageLowReadLatency:
3448+
switch spanPolicy.ValueStoragePolicy.PolicyAdjustment {
3449+
case UseDefaultValueStorage:
3450+
// No change to value separation.
3451+
case NoValueSeparation:
34613452
vSep = valsep.NeverSeparateValues{}
3462-
case ValueStorageLatencyTolerant:
3453+
case OverrideValueStorage:
34633454
// This span of keyspace is more tolerant of latency, so set a more
34643455
// aggressive value separation policy for this output.
34653456
vSep.SetNextOutputConfig(valsep.ValueSeparationOutputConfig{
3466-
MinimumSize: latencyTolerantMinimumSize,
3457+
MinimumSize: spanPolicy.ValueStoragePolicy.MinimumSize,
34673458
})
34683459
}
34693460
objMeta, tw, err := d.newCompactionOutputTable(jobID, c, writerOpts)

compaction_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1507,6 +1507,48 @@ func runCompactionTest(
15071507
s := blobRewriteLog.String()
15081508
return s
15091509

1510+
case "set-span-policies":
1511+
var spanPolicies []SpanAndPolicy
1512+
for _, line := range strings.Split(td.Input, "\n") {
1513+
line = strings.TrimSpace(line)
1514+
args := strings.Fields(line)
1515+
if len(args) < 2 {
1516+
td.Fatalf(t, "expected span and policy, got: %s", line)
1517+
}
1518+
// First arg should be keys in the form <start>,<end>
1519+
keys := strings.Split(args[0], ",")
1520+
keyRange := KeyRange{
1521+
Start: []byte(keys[0]),
1522+
End: []byte(keys[1]),
1523+
}
1524+
policy := SpanPolicy{}
1525+
args = args[1:]
1526+
for _, arg := range args {
1527+
parts := strings.Split(arg, "=")
1528+
switch parts[0] {
1529+
case "val-sep-minimum-size":
1530+
if len(parts) != 2 {
1531+
td.Fatalf(t, "expected val-sep-minimum-size=<size>, got: %s", arg)
1532+
}
1533+
size, err := strconv.ParseUint(parts[1], 10, 64)
1534+
if err != nil {
1535+
td.Fatalf(t, "parsing value-minimum-size: %s", err)
1536+
}
1537+
policy.ValueStoragePolicy.MinimumSize = int(size)
1538+
if size == 0 {
1539+
policy.ValueStoragePolicy.PolicyAdjustment = NoValueSeparation
1540+
} else if int(size) != d.opts.Experimental.ValueSeparationPolicy().MinimumSize {
1541+
policy.ValueStoragePolicy.PolicyAdjustment = OverrideValueStorage
1542+
}
1543+
}
1544+
}
1545+
spanPolicies = append(spanPolicies, SpanAndPolicy{
1546+
KeyRange: keyRange,
1547+
Policy: policy,
1548+
})
1549+
}
1550+
d.opts.Experimental.SpanPolicyFunc = MakeStaticSpanPolicyFunc(d.cmp, spanPolicies...)
1551+
return ""
15101552
default:
15111553
return fmt.Sprintf("unknown command: %s", td.Cmd)
15121554
}

compaction_value_separation.go

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,11 @@ import (
99
"github.com/cockroachdb/pebble/internal/base"
1010
"github.com/cockroachdb/pebble/internal/manifest"
1111
"github.com/cockroachdb/pebble/objstorage"
12-
"github.com/cockroachdb/pebble/sstable"
1312
"github.com/cockroachdb/pebble/valsep"
1413
"github.com/cockroachdb/redact"
1514
)
1615

17-
// latencyTolerantMinimumSize is the minimum size, in bytes, of a value that
18-
// will be separated into a blob file when the value storage policy is
19-
// ValueStorageLatencyTolerant.
20-
const latencyTolerantMinimumSize = 10
21-
22-
var neverSeparateValues getValueSeparation = func(JobID, *tableCompaction, ValueStoragePolicy) valsep.ValueSeparation {
16+
var neverSeparateValues getValueSeparation = func(JobID, *tableCompaction) valsep.ValueSeparation {
2317
return valsep.NeverSeparateValues{}
2418
}
2519

@@ -29,7 +23,7 @@ var neverSeparateValues getValueSeparation = func(JobID, *tableCompaction, Value
2923
//
3024
// It assumes that the compaction will write tables at d.TableFormat() or above.
3125
func (d *DB) determineCompactionValueSeparation(
32-
jobID JobID, c *tableCompaction, valueStorage ValueStoragePolicy,
26+
jobID JobID, c *tableCompaction,
3327
) valsep.ValueSeparation {
3428
if d.FormatMajorVersion() < FormatValueSeparation ||
3529
d.opts.Experimental.ValueSeparationPolicy == nil {
@@ -48,25 +42,12 @@ func (d *DB) determineCompactionValueSeparation(
4842
// For flushes, c.version is nil.
4943
blobFileSet = uniqueInputBlobMetadatas(&c.version.BlobFiles, c.inputs)
5044
}
51-
minSize := policy.MinimumSize
52-
switch valueStorage {
53-
case ValueStorageLowReadLatency:
54-
return valsep.NeverSeparateValues{}
55-
case ValueStorageLatencyTolerant:
56-
minSize = latencyTolerantMinimumSize
57-
default:
58-
}
59-
if writeBlobs, outputBlobReferenceDepth := shouldWriteBlobFiles(c, policy, uint64(minSize)); !writeBlobs {
45+
if writeBlobs, outputBlobReferenceDepth := shouldWriteBlobFiles(c, policy, d.opts.Experimental.SpanPolicyFunc, d.cmp); !writeBlobs {
6046
// This compaction should preserve existing blob references.
61-
kind := sstable.ValueSeparationDefault
62-
if valueStorage != ValueStorageDefault {
63-
kind = sstable.ValueSeparationSpanPolicy
64-
}
6547
return valsep.NewPreserveAllHotBlobReferences(
6648
blobFileSet,
6749
outputBlobReferenceDepth,
68-
kind,
69-
minSize,
50+
policy.MinimumSize,
7051
)
7152
}
7253

@@ -101,7 +82,7 @@ func (d *DB) determineCompactionValueSeparation(
10182
// maximum blob reference depth to assign to output sstables (the actual value
10283
// may be lower iff the output table references fewer distinct blob files).
10384
func shouldWriteBlobFiles(
104-
c *tableCompaction, policy ValueSeparationPolicy, minimumValueSizeForCompaction uint64,
85+
c *tableCompaction, policy ValueSeparationPolicy, spanPolicyFunc SpanPolicyFunc, cmp Compare,
10586
) (writeBlobs bool, referenceDepth manifest.BlobReferenceDepth) {
10687
// Flushes will have no existing references to blob files and should write
10788
// their values to new blob files.
@@ -124,6 +105,7 @@ func shouldWriteBlobFiles(
124105
// We should try to write to new blob files.
125106
return true, 0
126107
}
108+
127109
// If the compaction's output blob reference depth would be greater than the
128110
// configured max, we should rewrite the values into new blob files to
129111
// restore locality.
@@ -140,7 +122,32 @@ func shouldWriteBlobFiles(
140122
if !backingPropsValid {
141123
continue
142124
}
143-
if backingProps.ValueSeparationMinSize != minimumValueSizeForCompaction {
125+
126+
var expectedMinSize int
127+
bounds := t.UserKeyBounds()
128+
spanPolicy, spanPolicyEndKey, err := spanPolicyFunc(bounds.Start)
129+
if err != nil {
130+
// For now, if we can't determine the span policy, we should just assume
131+
// the default policy is in effect for this table.
132+
expectedMinSize = policy.MinimumSize
133+
} else {
134+
if len(spanPolicyEndKey) > 0 && cmp(bounds.End.Key, spanPolicyEndKey) >= 0 {
135+
// The table's key range now uses multiple span policies. Rewrite to new
136+
// blob files so values are stored according to the current policy.
137+
return true, 0
138+
}
139+
switch spanPolicy.ValueStoragePolicy.PolicyAdjustment {
140+
case UseDefaultValueStorage:
141+
// Use the global policy's minimum size.
142+
expectedMinSize = policy.MinimumSize
143+
case OverrideValueStorage:
144+
expectedMinSize = spanPolicy.ValueStoragePolicy.MinimumSize
145+
case NoValueSeparation:
146+
expectedMinSize = 0
147+
}
148+
}
149+
150+
if int(backingProps.ValueSeparationMinSize) != expectedMinSize {
144151
return true, 0
145152
}
146153
}

data_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -918,7 +918,6 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
918918
valueSeparator.pbr = valsep.NewPreserveAllHotBlobReferences(
919919
valueSeparator.metas,
920920
0, /* outputreference depth */
921-
sstable.ValueSeparationDefault,
922921
d.opts.Experimental.ValueSeparationPolicy().MinimumSize,
923922
)
924923

@@ -937,7 +936,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
937936
flushable: mem,
938937
flushed: make(chan struct{}),
939938
}}
940-
getValueSeparator := func(JobID, *tableCompaction, ValueStoragePolicy) valsep.ValueSeparation {
939+
getValueSeparator := func(JobID, *tableCompaction) valsep.ValueSeparation {
941940
return valueSeparator
942941
}
943942
c, err := newFlush(d.opts, d.mu.versions.currentVersion(), d.mu.versions.latest.l0Organizer,

options.go

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,7 +1294,7 @@ type SpanPolicy struct {
12941294

12951295
// ValueStoragePolicy is a hint used to determine where to store the values
12961296
// for KVs.
1297-
ValueStoragePolicy ValueStoragePolicy
1297+
ValueStoragePolicy ValueStoragePolicyAdjustment
12981298
}
12991299

13001300
// String returns a string representation of the SpanPolicy.
@@ -1306,38 +1306,64 @@ func (p SpanPolicy) String() string {
13061306
if p.DisableValueSeparationBySuffix {
13071307
sb.WriteString("disable-value-separation-by-suffix,")
13081308
}
1309-
switch p.ValueStoragePolicy {
1310-
case ValueStorageLowReadLatency:
1311-
sb.WriteString("low-read-latency,")
1312-
case ValueStorageLatencyTolerant:
1313-
sb.WriteString("latency-tolerant,")
1309+
switch p.ValueStoragePolicy.PolicyAdjustment {
1310+
case NoValueSeparation:
1311+
sb.WriteString("no-value-separation,")
1312+
case OverrideValueStorage:
1313+
sb.WriteString("override,")
13141314
}
13151315
return strings.TrimSuffix(sb.String(), ",")
13161316
}
13171317

1318-
// ValueStoragePolicy is a hint used to determine where to store the values for
1319-
// KVs.
1320-
type ValueStoragePolicy uint8
1318+
// ValueStoragePolicyAdjustment is used to determine where to store the values for
1319+
// KVs. If the PolicyAdjustment specified is OverrideValueStorage, the remaining fields
1320+
// are used to override the global configuration for value separation.
1321+
type ValueStoragePolicyAdjustment struct {
1322+
// PolicyAdjustment specifies the policy adjustment to apply.
1323+
PolicyAdjustment ValueStoragePolicyAdjustmentType
1324+
1325+
// Remaining fields are ignored, unless the PolicyAdjustment is OverrideValueStorage.
1326+
1327+
// MinimumSize is the minimum size of the value.
1328+
MinimumSize int
1329+
}
1330+
1331+
// ValueStoragePolicyAdjustmentType is a hint used to determine where to store the
1332+
// values for KVs.
1333+
type ValueStoragePolicyAdjustmentType uint8
13211334

13221335
const (
1323-
// ValueStorageDefault is the default value; Pebble will respect global
1324-
// configuration for value blocks and value separation.
1325-
ValueStorageDefault ValueStoragePolicy = iota
1336+
// UseDefaultValueStorage is the default value; Pebble will respect global
1337+
// configuration for value separation.
1338+
UseDefaultValueStorage ValueStoragePolicyAdjustmentType = iota
13261339

1327-
// ValueStorageLowReadLatency indicates Pebble should prefer storing values
1340+
// NoValueSeparation indicates Pebble should prefer storing values
13281341
// in-place.
1329-
ValueStorageLowReadLatency
1330-
1331-
// ValueStorageLatencyTolerant indicates value retrieval can tolerate
1332-
// additional latency, so Pebble should aggressively prefer storing values
1333-
// separately if it can reduce write amplification.
1334-
//
1335-
// If the global Options' enable value separation, Pebble may choose to
1336-
// separate values under the LatencyTolerant policy even if they do not meet
1337-
// the minimum size threshold of the global Options' ValueSeparationPolicy.
1338-
ValueStorageLatencyTolerant
1342+
NoValueSeparation
1343+
1344+
// OverrideValueStorage indicates that value separation thresholds (see
1345+
// valsep.ValueSeparationOutputConfig) for this key range are being
1346+
// overridden from a SpanPolicy. If the global Options enable value
1347+
// separation, Pebble will separate values under the OverrideValueStorage
1348+
// policy even if they do not meet the minimum size threshold of the
1349+
// global Options' ValueSeparationPolicy.
1350+
OverrideValueStorage
13391351
)
13401352

1353+
// ValueStorageLatencyTolerant is the suggested ValueStoragePolicyAdjustment
1354+
// to use for key ranges that can tolerate higher value retrieval
1355+
// latency.
1356+
var ValueStorageLatencyTolerant = ValueStoragePolicyAdjustment{
1357+
PolicyAdjustment: OverrideValueStorage,
1358+
MinimumSize: 10,
1359+
}
1360+
1361+
// ValueStorageLowReadLatency is the suggested ValueStoragePolicyAdjustment
1362+
// to use for key ranges that require low value retrieval latency.
1363+
var ValueStorageLowReadLatency = ValueStoragePolicyAdjustment{
1364+
PolicyAdjustment: NoValueSeparation,
1365+
}
1366+
13411367
// SpanPolicyFunc is used to determine the SpanPolicy for a key region.
13421368
//
13431369
// The returned policy is valid from the start key until (and not including) the

sstable/properties.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,8 @@ const (
241241
ValueSeparationNone ValueSeparationKind = iota
242242

243243
// ValueSeparationDefault indicates that values were separated into
244-
// new blob files during writing.
244+
// new blob files during writing using the default value separation
245+
// policy settings for minimum sizes.
245246
ValueSeparationDefault
246247

247248
// ValueSeparationSpanPolicy indicates that values were separated

0 commit comments

Comments
 (0)