Skip to content

Commit 8a7840b

Browse files
committed
deletepacer: don't use fifo.Queue
There is a msan failure that goes away if we switch to using a slice here. This isn't due to some bug in `fifo.Queue` but it looks like a possible compiler bug; the problem will continue to be investigated as part of golang/go#76138 We don't really need the specialized `fifo.Queue` here, a slice is just as good; we just make sure to allocate in reasonable chunks. Fixes #5498
1 parent 41b4919 commit 8a7840b

File tree

2 files changed

+15
-13
lines changed

2 files changed

+15
-13
lines changed

internal/deletepacer/delete_pacer.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ package deletepacer
77
import (
88
"context"
99
"runtime/pprof"
10+
"slices"
1011
"sync"
1112
"time"
1213

1314
"github.com/cockroachdb/crlib/crtime"
14-
"github.com/cockroachdb/crlib/fifo"
1515
"github.com/cockroachdb/pebble/internal/base"
1616
"github.com/cockroachdb/pebble/internal/invariants"
1717
"github.com/cockroachdb/pebble/metrics"
@@ -54,7 +54,7 @@ type DeletePacer struct {
5454
mu struct {
5555
sync.Mutex
5656

57-
queue fifo.Queue[queueEntry]
57+
queue []queueEntry
5858
queuedPacingBytes uint64
5959
// queuedHistory keeps track of pacing bytes added to the queue within the
6060
// last 5 minutes.
@@ -100,7 +100,6 @@ func Open(
100100
deleteFn: deleteFn,
101101
notifyCh: make(chan struct{}, 1),
102102
}
103-
dp.mu.queue = fifo.MakeQueue(&queueBackingPool)
104103
dp.mu.queuedHistory.Init(crtime.NowMono(), RecentRateWindow)
105104
dp.mu.deletedCond.L = &dp.mu.Mutex
106105
dp.waitGroup.Add(1)
@@ -112,8 +111,6 @@ func Open(
112111
return dp
113112
}
114113

115-
var queueBackingPool = fifo.MakeQueueBackingPool[queueEntry]()
116-
117114
type queueEntry struct {
118115
ObsoleteFile
119116
JobID int
@@ -159,17 +156,17 @@ func (dp *DeletePacer) mainLoop() {
159156
dp.mu.Lock()
160157
defer dp.mu.Unlock()
161158
for {
162-
if dp.mu.closed && dp.mu.queue.Len() == 0 {
159+
if dp.mu.closed && len(dp.mu.queue) == 0 {
163160
return
164161
}
165162
now := crtime.NowMono()
166163
disablePacing := dp.mu.closed
167-
if dp.mu.queue.Len() > maxQueueSize {
164+
if len(dp.mu.queue) > maxQueueSize {
168165
// The queue is getting out of hand; disable pacing.
169166
disablePacing = true
170167
if lastMaxQueueLog == 0 || now.Sub(lastMaxQueueLog) > time.Minute {
171168
lastMaxQueueLog = now
172-
dp.logger.Errorf("excessive delete pacer queue size %d; pacing temporarily disabled", dp.mu.queue.Len())
169+
dp.logger.Errorf("excessive delete pacer queue size %d; pacing temporarily disabled", len(dp.mu.queue))
173170
}
174171
}
175172

@@ -180,7 +177,7 @@ func (dp *DeletePacer) mainLoop() {
180177
// 2. Wait for pacing debt to clear;
181178
// 3. Otherwise, delete next file.
182179
switch {
183-
case dp.mu.queue.Len() == 0:
180+
case len(dp.mu.queue) == 0:
184181
// Nothing to do.
185182
dp.mu.Unlock()
186183
<-dp.notifyCh
@@ -203,8 +200,8 @@ func (dp *DeletePacer) mainLoop() {
203200

204201
default:
205202
// Delete a file.
206-
file := *dp.mu.queue.PeekFront()
207-
dp.mu.queue.PopFront()
203+
file := dp.mu.queue[0]
204+
dp.mu.queue = dp.mu.queue[1:]
208205
if b := file.pacingBytes(); b != 0 {
209206
dp.mu.queuedPacingBytes = invariants.SafeSub(dp.mu.queuedPacingBytes, b)
210207
rateCalc.AddDebt(b)
@@ -238,7 +235,12 @@ func (dp *DeletePacer) Enqueue(jobID int, files ...ObsoleteFile) {
238235
dp.mu.queuedHistory.Add(now, b)
239236
}
240237
dp.mu.metrics.InQueue.Inc(file.FileType, file.FileSize, file.Placement)
241-
dp.mu.queue.PushBack(queueEntry{
238+
if len(dp.mu.queue) == cap(dp.mu.queue) {
239+
// If we have to allocate, make sure we don't have to allocate again for a
240+
// while.
241+
dp.mu.queue = slices.Grow(dp.mu.queue, 1000)
242+
}
243+
dp.mu.queue = append(dp.mu.queue, queueEntry{
242244
ObsoleteFile: file,
243245
JobID: jobID,
244246
})

internal/deletepacer/delete_pacer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ func TestFallingBehind(t *testing.T) {
248248
queueSize := func() int {
249249
dp.mu.Lock()
250250
defer dp.mu.Unlock()
251-
return dp.mu.queue.Len()
251+
return len(dp.mu.queue)
252252
}
253253
// At 1MB, each job will take 100ms each. Note that the rate increase based on
254254
// history won't make much difference, since the enqueued size is averaged

0 commit comments

Comments
 (0)