Skip to content

Commit f19adee

Browse files
committed
db: remove MaxWriterConcurrency and CPUWorkPermissionGranter
Remove the MaxWriterConcurrency option and the CPUWorkPermissionGranter interface. The new columnar-block sstable writer does not support parallel compression. We expect we do not want to parallelize work beneath the granularity of compactions, in part because it is difficult to integrate with admission control.
1 parent 9630202 commit f19adee

File tree

9 files changed

+12
-142
lines changed

9 files changed

+12
-142
lines changed

compaction.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3118,12 +3118,11 @@ func (d *DB) compactAndWrite(
31183118
}
31193119
// Create a new table.
31203120
writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
3121-
objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts)
3121+
objMeta, tw, err := d.newCompactionOutput(jobID, c, writerOpts)
31223122
if err != nil {
31233123
return runner.Finish().WithError(err)
31243124
}
31253125
runner.WriteTable(objMeta, tw)
3126-
d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle)
31273126
}
31283127
result = runner.Finish()
31293128
if result.Err == nil {
@@ -3246,10 +3245,10 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error
32463245
// compaction or flush.
32473246
func (d *DB) newCompactionOutput(
32483247
jobID JobID, c *compaction, writerOpts sstable.WriterOptions,
3249-
) (objstorage.ObjectMetadata, sstable.RawWriter, CPUWorkHandle, error) {
3248+
) (objstorage.ObjectMetadata, sstable.RawWriter, error) {
32503249
writable, objMeta, err := d.newCompactionOutputObj(jobID, c, base.FileTypeTable)
32513250
if err != nil {
3252-
return objstorage.ObjectMetadata{}, nil, nil, err
3251+
return objstorage.ObjectMetadata{}, nil, err
32533252
}
32543253

32553254
var reason string
@@ -3272,16 +3271,10 @@ func (d *DB) newCompactionOutput(
32723271
},
32733272
})
32743273

3275-
const MaxFileWriteAdditionalCPUTime = time.Millisecond * 100
3276-
cpuWorkHandle := d.opts.Experimental.CPUWorkPermissionGranter.GetPermission(
3277-
MaxFileWriteAdditionalCPUTime,
3278-
)
3279-
writerOpts.Parallelism =
3280-
d.opts.Experimental.MaxWriterConcurrency > 0 &&
3281-
(cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)
3274+
writerOpts.Parallelism = d.opts.Experimental.ForceWriterParallelism
32823275

32833276
tw := sstable.NewRawWriter(writable, writerOpts)
3284-
return objMeta, tw, cpuWorkHandle, nil
3277+
return objMeta, tw, nil
32853278
}
32863279

32873280
// newCompactionOutputObj creates an object produced by a compaction or flush.

compaction_test.go

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -541,73 +541,6 @@ func TestPickCompaction(t *testing.T) {
541541
}
542542
}
543543

544-
type cpuPermissionGranter struct {
545-
// requestCount is used to confirm that every GetPermission function call
546-
// has a corresponding CPUWorkDone function call.
547-
requestCount int
548-
used bool
549-
permit bool
550-
}
551-
552-
type cpuWorkHandle struct {
553-
permit bool
554-
}
555-
556-
func (c cpuWorkHandle) Permitted() bool {
557-
return c.permit
558-
}
559-
560-
func (t *cpuPermissionGranter) GetPermission(dur time.Duration) CPUWorkHandle {
561-
t.requestCount++
562-
t.used = true
563-
return cpuWorkHandle{t.permit}
564-
}
565-
566-
func (t *cpuPermissionGranter) CPUWorkDone(_ CPUWorkHandle) {
567-
t.requestCount--
568-
}
569-
570-
// Simple test to check if compactions are using the granter, and if exactly
571-
// the acquired handles are returned.
572-
func TestCompactionCPUGranter(t *testing.T) {
573-
mem := vfs.NewMem()
574-
opts := &Options{FS: mem}
575-
opts.WithFSDefaults()
576-
g := &cpuPermissionGranter{permit: true}
577-
opts.Experimental.CPUWorkPermissionGranter = g
578-
d, err := Open("", opts)
579-
if err != nil {
580-
t.Fatalf("Open: %v", err)
581-
}
582-
defer d.Close()
583-
584-
d.Set([]byte{'a'}, []byte{'a'}, nil)
585-
err = d.Compact([]byte{'a'}, []byte{'b'}, true)
586-
if err != nil {
587-
t.Fatalf("Compact: %v", err)
588-
}
589-
require.True(t, g.used)
590-
require.Equal(t, g.requestCount, 0)
591-
}
592-
593-
// Tests that there's no errors or panics when the default CPU granter is used.
594-
func TestCompactionCPUGranterDefault(t *testing.T) {
595-
mem := vfs.NewMem()
596-
opts := &Options{FS: mem}
597-
opts.WithFSDefaults()
598-
d, err := Open("", opts)
599-
if err != nil {
600-
t.Fatalf("Open: %v", err)
601-
}
602-
defer d.Close()
603-
604-
d.Set([]byte{'a'}, []byte{'a'}, nil)
605-
err = d.Compact([]byte{'a'}, []byte{'b'}, true)
606-
if err != nil {
607-
t.Fatalf("Compact: %v", err)
608-
}
609-
}
610-
611544
func TestCompaction(t *testing.T) {
612545
const memTableSize = 10000
613546
// Tuned so that 2 values can reside in the memtable before a flush, but a

db.go

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -220,40 +220,6 @@ type Writer interface {
220220
RangeKeyDelete(start, end []byte, opts *WriteOptions) error
221221
}
222222

223-
// CPUWorkHandle represents a handle used by the CPUWorkPermissionGranter API.
224-
type CPUWorkHandle interface {
225-
// Permitted indicates whether Pebble can use additional CPU resources.
226-
Permitted() bool
227-
}
228-
229-
// CPUWorkPermissionGranter is used to request permission to opportunistically
230-
// use additional CPUs to speed up internal background work.
231-
type CPUWorkPermissionGranter interface {
232-
// GetPermission returns a handle regardless of whether permission is granted
233-
// or not. In the latter case, the handle is only useful for recording
234-
// the CPU time actually spent on this calling goroutine.
235-
GetPermission(time.Duration) CPUWorkHandle
236-
// CPUWorkDone must be called regardless of whether CPUWorkHandle.Permitted
237-
// returns true or false.
238-
CPUWorkDone(CPUWorkHandle)
239-
}
240-
241-
// Use a default implementation for the CPU work granter to avoid excessive nil
242-
// checks in the code.
243-
type defaultCPUWorkHandle struct{}
244-
245-
func (d defaultCPUWorkHandle) Permitted() bool {
246-
return false
247-
}
248-
249-
type defaultCPUWorkGranter struct{}
250-
251-
func (d defaultCPUWorkGranter) GetPermission(_ time.Duration) CPUWorkHandle {
252-
return defaultCPUWorkHandle{}
253-
}
254-
255-
func (d defaultCPUWorkGranter) CPUWorkDone(_ CPUWorkHandle) {}
256-
257223
// DB provides a concurrent, persistent ordered key/value store.
258224
//
259225
// A DB's basic operations (Get, Set, Delete) should be self-explanatory. Get

metamorphic/options.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -738,10 +738,7 @@ func RandomOptions(
738738
}
739739
}
740740
if rng.IntN(4) == 0 {
741-
// Enable Writer parallelism for 25% of the random options. Setting
742-
// MaxWriterConcurrency to any value greater than or equal to 1 has the
743-
// same effect currently.
744-
opts.Experimental.MaxWriterConcurrency = 2
741+
// Enable Writer parallelism for 25% of the random options.
745742
opts.Experimental.ForceWriterParallelism = true
746743
}
747744
if rng.IntN(2) == 0 {

options.go

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -660,24 +660,12 @@ type Options struct {
660660
// compaction will never get triggered.
661661
MultiLevelCompactionHeuristic MultiLevelHeuristic
662662

663-
// MaxWriterConcurrency is used to indicate the maximum number of
664-
// compression workers the compression queue is allowed to use. If
665-
// MaxWriterConcurrency > 0, then the Writer will use parallelism, to
666-
// compress and write blocks to disk. Otherwise, the writer will
667-
// compress and write blocks to disk synchronously.
668-
MaxWriterConcurrency int
669-
670663
// ForceWriterParallelism is used to force parallelism in the sstable
671664
// Writer for the metamorphic tests. Even with the MaxWriterConcurrency
672665
// option set, we only enable parallelism in the sstable Writer if there
673666
// is enough CPU available, and this option bypasses that.
674667
ForceWriterParallelism bool
675668

676-
// CPUWorkPermissionGranter should be set if Pebble should be given the
677-
// ability to optionally schedule additional CPU. See the documentation
678-
// for CPUWorkPermissionGranter for more details.
679-
CPUWorkPermissionGranter CPUWorkPermissionGranter
680-
681669
// EnableColumnarBlocks is used to decide whether to enable writing
682670
// TableFormatPebblev5 sstables. This setting is only respected by
683671
// FormatColumnarBlocks. In lower format major versions, the
@@ -1287,9 +1275,6 @@ func (o *Options) EnsureDefaults() {
12871275
if o.Experimental.FileCacheShards <= 0 {
12881276
o.Experimental.FileCacheShards = runtime.GOMAXPROCS(0)
12891277
}
1290-
if o.Experimental.CPUWorkPermissionGranter == nil {
1291-
o.Experimental.CPUWorkPermissionGranter = defaultCPUWorkGranter{}
1292-
}
12931278
if o.Experimental.MultiLevelCompactionHeuristic == nil {
12941279
o.Experimental.MultiLevelCompactionHeuristic = WriteAmpHeuristic{}
12951280
}
@@ -1433,7 +1418,6 @@ func (o *Options) String() string {
14331418
fmt.Fprintf(&buf, " validate_on_ingest=%t\n", o.Experimental.ValidateOnIngest)
14341419
fmt.Fprintf(&buf, " wal_dir=%s\n", o.WALDir)
14351420
fmt.Fprintf(&buf, " wal_bytes_per_sync=%d\n", o.WALBytesPerSync)
1436-
fmt.Fprintf(&buf, " max_writer_concurrency=%d\n", o.Experimental.MaxWriterConcurrency)
14371421
fmt.Fprintf(&buf, " force_writer_parallelism=%t\n", o.Experimental.ForceWriterParallelism)
14381422
fmt.Fprintf(&buf, " secondary_cache_size_bytes=%d\n", o.Experimental.SecondaryCacheSizeBytes)
14391423
fmt.Fprintf(&buf, " create_on_shared=%d\n", o.Experimental.CreateOnShared)
@@ -1833,7 +1817,7 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error {
18331817
case "wal_bytes_per_sync":
18341818
o.WALBytesPerSync, err = strconv.Atoi(value)
18351819
case "max_writer_concurrency":
1836-
o.Experimental.MaxWriterConcurrency, err = strconv.Atoi(value)
1820+
// No longer implemented; ignore.
18371821
case "force_writer_parallelism":
18381822
o.Experimental.ForceWriterParallelism, err = strconv.ParseBool(value)
18391823
case "secondary_cache_size_bytes":

options_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ func TestDefaultOptionsString(t *testing.T) {
115115
validate_on_ingest=false
116116
wal_dir=
117117
wal_bytes_per_sync=0
118-
max_writer_concurrency=0
119118
force_writer_parallelism=false
120119
secondary_cache_size_bytes=0
121120
create_on_shared=0
@@ -296,7 +295,6 @@ func TestOptionsParse(t *testing.T) {
296295
opts.Experimental.DeletionSizeRatioThreshold = 0.7
297296
opts.Experimental.TombstoneDenseCompactionThreshold = 0.2
298297
opts.Experimental.FileCacheShards = 500
299-
opts.Experimental.MaxWriterConcurrency = 1
300298
opts.Experimental.ForceWriterParallelism = true
301299
opts.Experimental.SecondaryCacheSizeBytes = 1024
302300
opts.EnsureDefaults()

replay/testdata/replay

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ tree
1111
614 000007.sst
1212
0 LOCK
1313
133 MANIFEST-000001
14-
1418 OPTIONS-000003
14+
1391 OPTIONS-000003
1515
0 marker.format-version.000001.013
1616
0 marker.manifest.000001.MANIFEST-000001
1717
simple/
@@ -21,7 +21,7 @@ tree
2121
25 000004.log
2222
586 000005.sst
2323
85 MANIFEST-000001
24-
1418 OPTIONS-000003
24+
1391 OPTIONS-000003
2525
0 marker.format-version.000001.013
2626
0 marker.manifest.000001.MANIFEST-000001
2727

@@ -67,7 +67,6 @@ cat build/OPTIONS-000003
6767
validate_on_ingest=false
6868
wal_dir=
6969
wal_bytes_per_sync=0
70-
max_writer_concurrency=0
7170
force_writer_parallelism=false
7271
secondary_cache_size_bytes=0
7372
create_on_shared=0

replay/testdata/replay_paced

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ tree
1414
0 LOCK
1515
133 MANIFEST-000001
1616
205 MANIFEST-000010
17-
1418 OPTIONS-000003
17+
1391 OPTIONS-000003
1818
0 marker.format-version.000001.013
1919
0 marker.manifest.000002.MANIFEST-000010
2020
high_read_amp/
@@ -26,7 +26,7 @@ tree
2626
39 000008.log
2727
560 000009.sst
2828
157 MANIFEST-000010
29-
1418 OPTIONS-000003
29+
1391 OPTIONS-000003
3030
0 marker.format-version.000001.013
3131
0 marker.manifest.000001.MANIFEST-000010
3232

testdata/metrics

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ Iter category stats:
238238

239239
disk-usage
240240
----
241-
2.8KB
241+
2.7KB
242242

243243
# Closing iter b will release the last zombie sstable and the last zombie memtable.
244244

0 commit comments

Comments
 (0)