Skip to content

Commit c0b18bc

Browse files
committed
db: fix plumbing of CompactionGrantHandle
CompactionGrantHandle now includes the CPUMeasurer interface, that has a separate method for each kind of goroutine, instead of identifying the goroutine kind by an index. This includes the secondary goroutine for writing to blob files. This interface is plumbed to RawColumnWriter and blob.FileWriter, so that the secondary writing goroutines can report CPU after writing each block.
1 parent 64b58ad commit c0b18bc

File tree

7 files changed

+92
-20
lines changed

7 files changed

+92
-20
lines changed

compaction.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3305,7 +3305,7 @@ func (d *DB) newCompactionOutput(
33053305
},
33063306
})
33073307

3308-
tw := sstable.NewRawWriter(writable, writerOpts)
3308+
tw := sstable.NewRawWriterWithCPUMeasurer(writable, writerOpts, c.grantHandle)
33093309
return objMeta, tw, nil
33103310
}
33113311

compaction_scheduler.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ import (
1414

1515
type CompactionGrantHandle = base.CompactionGrantHandle
1616
type CompactionGrantHandleStats = base.CompactionGrantHandleStats
17+
type CompactionGoroutineKind = base.CompactionGoroutineKind
18+
19+
const (
20+
CompactionGoroutinePrimary = base.CompactionGoroutinePrimary
21+
CompactionGoroutineSSTableSecondary = base.CompactionGoroutineSSTableSecondary
22+
CompactionGoroutineBlobFileSecondary = base.CompactionGoroutineBlobFileSecondary
23+
)
1724

1825
// NB: This interface is experimental and subject to change.
1926
//
@@ -157,7 +164,7 @@ type noopGrantHandle struct{}
157164
var _ CompactionGrantHandle = noopGrantHandle{}
158165

159166
func (h noopGrantHandle) Started() {}
160-
func (h noopGrantHandle) MeasureCPU(g int) {}
167+
func (h noopGrantHandle) MeasureCPU(CompactionGoroutineKind) {}
161168
func (h noopGrantHandle) CumulativeStats(stats base.CompactionGrantHandleStats) {}
162169
func (h noopGrantHandle) Done() {}
163170

@@ -333,10 +340,8 @@ func (s *ConcurrencyLimitScheduler) TrySchedule() (bool, CompactionGrantHandle)
333340
return false, nil
334341
}
335342

336-
func (s *ConcurrencyLimitScheduler) Started() {}
337-
338-
func (s *ConcurrencyLimitScheduler) MeasureCPU(g int) {}
339-
343+
func (s *ConcurrencyLimitScheduler) Started() {}
344+
func (s *ConcurrencyLimitScheduler) MeasureCPU(CompactionGoroutineKind) {}
340345
func (s *ConcurrencyLimitScheduler) CumulativeStats(stats base.CompactionGrantHandleStats) {}
341346

342347
func (s *ConcurrencyLimitScheduler) Done() {

internal/base/compaction_grant_handle.go

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,7 @@ type CompactionGrantHandle interface {
2020
// Started is called once and must precede calls to MeasureCPU and
2121
// CumulativeStats.
2222
Started()
23-
// MeasureCPU is used to measure the CPU consumption of a goroutine involved
24-
// in a compaction. It must be called from each of the two goroutines that
25-
// consume CPU during a compaction, and the first call must be before any
26-
// significant work is done, since the first call is used to initialize the
27-
// measurer for the goroutine. The parameter g must be 0 or 1, to
28-
// differentiate between the goroutines. If a compaction is only using one
29-
// goroutine, then it can skip calling MeasureCPU(1).
30-
MeasureCPU(g int)
23+
CPUMeasurer
3124
// CumulativeStats reports the current cumulative stats. This method may
3225
// block if the scheduler wants to pace the compaction (say to moderate its
3326
// consumption of disk write bandwidth).
@@ -39,3 +32,36 @@ type CompactionGrantHandle interface {
3932
// been installed.
4033
Done()
4134
}
35+
36+
// CompactionGoroutineKind identifies the kind of compaction goroutine.
37+
type CompactionGoroutineKind uint8
38+
39+
const (
40+
// CompactionGoroutinePrimary is the primary compaction goroutine that
41+
// iterates over key-value pairs in the input and calls the current sstable
42+
// writer and blob file writer.
43+
CompactionGoroutinePrimary CompactionGoroutineKind = iota
44+
// CompactionGoroutineSSTableSecondary is the secondary goroutine in the
45+
// current sstable writer that writes blocks to the sstable.
46+
CompactionGoroutineSSTableSecondary
47+
// CompactionGoroutineBlobFileSecondary is the secondary goroutine in the
48+
// current blob file writer that writes blocks to the blob file.
49+
CompactionGoroutineBlobFileSecondary
50+
)
51+
52+
// CPUMeasurer is used to measure the CPU consumption of goroutines involved
53+
// in a compaction.
54+
type CPUMeasurer interface {
55+
// MeasureCPU allows the measurer to keep track of CPU usage while a
56+
// compaction is ongoing. It is to be called regularly from the compaction
57+
// goroutine corresponding to the argument. The first call from a goroutine
58+
// must be done before any significant CPU consumption, since it is used to
59+
// initialize the measurer for the goroutine making the call. If a
60+
// compaction is not using a certain kind of goroutine, it can skip calling
61+
// this method with the corresponding argument.
62+
MeasureCPU(CompactionGoroutineKind)
63+
}
64+
65+
type NoopCPUMeasurer struct{}
66+
67+
func (NoopCPUMeasurer) MeasureCPU(CompactionGoroutineKind) {}

internal/compact/run.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -273,12 +273,14 @@ func (r *Runner) writeKeysToTable(tw sstable.RawWriter) (splitKey []byte, _ erro
273273
iteratedKeys++
274274
if iteratedKeys%updateGrantHandleEveryNKeys == 0 {
275275
r.cfg.GrantHandle.CumulativeStats(base.CompactionGrantHandleStats{
276+
// TODO(jackson): CumulativeWrittenSize does not include blob files
277+
// already written by this compaction, and
278+
// ValueSeparation.EstimatedFileSize does not either. So this
279+
// CumWriteBytes is incomplete.
276280
CumWriteBytes: r.stats.CumulativeWrittenSize + tw.EstimatedSize() +
277281
r.cfg.ValueSeparation.EstimatedFileSize(),
278282
})
279-
// TODO(sumeer): give the GrantHandle to the writer so it can account on
280-
// all its goroutines.
281-
r.cfg.GrantHandle.MeasureCPU(0)
283+
r.cfg.GrantHandle.MeasureCPU(base.CompactionGoroutinePrimary)
282284
}
283285
outputSize := tw.EstimatedSize()
284286
outputSize += r.cfg.ValueSeparation.EstimatedReferenceSize()
@@ -336,11 +338,15 @@ func (r *Runner) writeKeysToTable(tw sstable.RawWriter) (splitKey []byte, _ erro
336338
r.stats.CumulativePinnedSize += pinnedKeySize + pinnedValueSize
337339

338340
r.cfg.GrantHandle.CumulativeStats(base.CompactionGrantHandleStats{
341+
// TODO(jackson): CumulativeWrittenSize does not include blob files
342+
// already written by this compaction, and
343+
// ValueSeparation.EstimatedFileSize does not either. So this
344+
// CumWriteBytes is incomplete.
339345
CumWriteBytes: r.stats.CumulativeWrittenSize +
340346
tw.EstimatedSize() +
341347
r.cfg.ValueSeparation.EstimatedFileSize(),
342348
})
343-
r.cfg.GrantHandle.MeasureCPU(0)
349+
r.cfg.GrantHandle.MeasureCPU(base.CompactionGoroutinePrimary)
344350
return splitKey, nil
345351
}
346352

sstable/blob/blob.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ type FileWriterOptions struct {
5252
Compression block.Compression
5353
ChecksumType block.ChecksumType
5454
FlushGovernor block.FlushGovernor
55+
// Only CPUMeasurer.MeasureCPUBlobFileSecondary is used.
56+
CpuMeasurer base.CPUMeasurer
5557
}
5658

5759
func (o *FileWriterOptions) ensureDefaults() {
@@ -68,6 +70,9 @@ func (o *FileWriterOptions) ensureDefaults() {
6870
base.SizeClassAwareBlockSizeThreshold,
6971
nil)
7072
}
73+
if o.CpuMeasurer == nil {
74+
o.CpuMeasurer = base.NoopCPUMeasurer{}
75+
}
7176
}
7277

7378
// FileWriterStats aggregates statistics about a blob file written by a
@@ -98,6 +103,7 @@ type FileWriter struct {
98103
blockOffsets []uint64
99104
err error
100105
checksumType block.ChecksumType
106+
cpuMeasurer base.CPUMeasurer
101107
writeQueue struct {
102108
wg sync.WaitGroup
103109
ch chan compressedBlock
@@ -120,6 +126,7 @@ func NewFileWriter(fn base.DiskFileNum, w objstorage.Writable, opts FileWriterOp
120126
fw.b.Init(opts.Compression, opts.ChecksumType)
121127
fw.flushGov = opts.FlushGovernor
122128
fw.checksumType = opts.ChecksumType
129+
fw.cpuMeasurer = opts.CpuMeasurer
123130
fw.writeQueue.ch = make(chan compressedBlock)
124131
fw.writeQueue.wg.Add(1)
125132
go fw.drainWriteQueue()
@@ -173,8 +180,13 @@ func (w *FileWriter) flush() {
173180
// until the channel is closed. All value blocks are written by this goroutine.
174181
func (w *FileWriter) drainWriteQueue() {
175182
defer w.writeQueue.wg.Done()
183+
// Call once to initialize the CPU measurer.
184+
w.cpuMeasurer.MeasureCPU(base.CompactionGoroutineBlobFileSecondary)
176185
for cb := range w.writeQueue.ch {
177186
_, err := cb.pb.WriteTo(w.w)
187+
// Report to the CPU measurer immediately after writing (note that there
188+
// may be a time lag until the next block is available to write).
189+
w.cpuMeasurer.MeasureCPU(base.CompactionGoroutineBlobFileSecondary)
178190
if err != nil {
179191
w.writeQueue.err = err
180192
continue

sstable/colblk_writer.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,17 @@ type RawColumnWriter struct {
102102
previousUserKey invariants.Value[[]byte]
103103
validator invariants.Value[*colblk.DataBlockValidator]
104104
disableKeyOrderChecks bool
105+
cpuMeasurer base.CPUMeasurer
105106
}
106107

107108
// Assert that *RawColumnWriter implements RawWriter.
108109
var _ RawWriter = (*RawColumnWriter)(nil)
109110

110-
func newColumnarWriter(writable objstorage.Writable, o WriterOptions) *RawColumnWriter {
111+
// cpuMeasurer, if non-nil, is only used for calling
112+
// cpuMeasurer.MeasureCPUSSTableSecondary.
113+
func newColumnarWriter(
114+
writable objstorage.Writable, o WriterOptions, cpuMeasurer base.CPUMeasurer,
115+
) *RawColumnWriter {
111116
if writable == nil {
112117
panic("pebble: nil writable")
113118
}
@@ -177,6 +182,7 @@ func newColumnarWriter(writable objstorage.Writable, o WriterOptions) *RawColumn
177182

178183
w.writeQueue.ch = make(chan *compressedBlock)
179184
w.writeQueue.wg.Add(1)
185+
w.cpuMeasurer = cpuMeasurer
180186
go w.drainWriteQueue()
181187
return w
182188
}
@@ -863,10 +869,15 @@ func (w *RawColumnWriter) flushBufferedIndexBlocks() (rootIndex block.Handle, er
863869
// Other blocks are written directly by the client goroutine. See Close.
864870
func (w *RawColumnWriter) drainWriteQueue() {
865871
defer w.writeQueue.wg.Done()
872+
// Call once to initialize the CPU measurer.
873+
w.cpuMeasurer.MeasureCPU(base.CompactionGoroutineSSTableSecondary)
866874
for cb := range w.writeQueue.ch {
867875
if _, err := w.layout.WritePrecompressedDataBlock(cb.physical); err != nil {
868876
w.writeQueue.err = err
869877
}
878+
// Report to the CPU measurer immediately after writing (note that there
879+
// may be a time lag until the next block is available to write).
880+
w.cpuMeasurer.MeasureCPU(base.CompactionGoroutineSSTableSecondary)
870881
cb.blockBuf.clear()
871882
cb.physical = block.PhysicalBlock{}
872883
compressedBlockPool.Put(cb)

sstable/writer.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,22 @@ import (
1818
// NewRawWriter returns a new table writer for the file. Closing the writer will
1919
// close the file.
2020
func NewRawWriter(writable objstorage.Writable, o WriterOptions) RawWriter {
21+
return NewRawWriterWithCPUMeasurer(writable, o, base.NoopCPUMeasurer{})
22+
}
23+
24+
// NewRawWriterWithCPUMeasurer is like NewRawWriter, but additionally allows
25+
// the caller to specify a CPUMeasurer. Only
26+
// CPUMeasurer.MeasureCPU(CompactionGoroutineSSTableSecondary) is used by the
27+
// writer.
28+
func NewRawWriterWithCPUMeasurer(
29+
writable objstorage.Writable, o WriterOptions, cpuMeasurer base.CPUMeasurer,
30+
) RawWriter {
2131
if o.TableFormat <= TableFormatPebblev4 {
32+
// Don't bother plumbing the cpuMeasurer to the row writer since it is not
33+
// the default and will be removed.
2234
return newRowWriter(writable, o)
2335
}
24-
return newColumnarWriter(writable, o)
36+
return newColumnarWriter(writable, o, cpuMeasurer)
2537
}
2638

2739
// Writer is a table writer.

0 commit comments

Comments
 (0)