Skip to content

Commit 6c342d8

Browse files
committed
db: add fail-safe to the delete pacer
When we reach 3/4 of the capacity of the deletion jobs channel, we disable pacing. This should act as a catch-all if the delete pacing logic fails to keep up. Informs #5424
1 parent 7b8c50b commit 6c342d8

File tree

2 files changed

+86
-7
lines changed

2 files changed

+86
-7
lines changed

obsolete_files.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ func (m *cleanupManager) CompletedStats() obsoleteObjectStats {
6565

6666
// We can queue this many jobs before we have to block EnqueueJob.
6767
const jobsQueueDepth = 1000
68+
const jobsQueueHighThreshold = jobsQueueDepth * 3 / 4
69+
const jobsQueueLowThreshold = jobsQueueDepth / 10
6870

6971
// obsoleteFile holds information about a file that needs to be deleted soon.
7072
type obsoleteFile struct {
@@ -223,7 +225,11 @@ func (cm *cleanupManager) maybePace(
223225
if !of.needsPacing() {
224226
return
225227
}
226-
228+
if len(cm.jobsCh) >= jobsQueueHighThreshold {
229+
// If there are many jobs queued up, disable pacing. In this state, we
230+
// execute deletion jobs at the same rate as new jobs get queued.
231+
return
232+
}
227233
tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), of.fileSize)
228234
if tokens == 0.0 {
229235
// The token bucket might be in debt; it could make us wait even for 0
@@ -321,19 +327,17 @@ func (cm *cleanupManager) deleteObsoleteObject(
321327
//
322328
// Must be called with cm.mu locked.
323329
func (cm *cleanupManager) maybeLogLocked() {
324-
const highThreshold = jobsQueueDepth * 3 / 4
325-
const lowThreshold = jobsQueueDepth / 10
326330

327331
jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
328332

329-
if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
333+
if !cm.mu.jobsQueueWarningIssued && jobsInQueue > jobsQueueHighThreshold {
330334
cm.mu.jobsQueueWarningIssued = true
331-
cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
335+
cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", jobsQueueHighThreshold)
332336
}
333337

334-
if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
338+
if cm.mu.jobsQueueWarningIssued && jobsInQueue < jobsQueueLowThreshold {
335339
cm.mu.jobsQueueWarningIssued = false
336-
cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
340+
cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", jobsQueueLowThreshold)
337341
}
338342
}
339343

obsolete_files_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"sort"
1111
"strings"
12+
"sync/atomic"
1213
"testing"
1314
"time"
1415

@@ -195,3 +196,77 @@ func TestCleanupManagerCloseWithPacing(t *testing.T) {
195196
t.Fatalf("timed out waiting for cleanupManager.Close() to return")
196197
}
197198
}
199+
200+
// TestCleanupManagerFallingBehind verifies that we disable pacing when the jobs
201+
// channel reaches the high threshold.
202+
func TestCleanupManagerFallingBehind(t *testing.T) {
203+
mem := vfs.NewMem()
204+
var rate atomic.Int32
205+
rate.Store(10 * MB) // 10MB/s
206+
opts := &Options{
207+
FS: mem,
208+
FreeSpaceThresholdBytes: 1,
209+
TargetByteDeletionRate: func() int { return int(rate.Load()) }, // 10 MB/s
210+
}
211+
opts.EnsureDefaults()
212+
213+
objProvider, err := objstorageprovider.Open(objstorageprovider.Settings{
214+
FS: mem,
215+
FSDirName: "/",
216+
})
217+
require.NoError(t, err)
218+
defer objProvider.Close()
219+
220+
getDeletePacerInfo := func() deletionPacerInfo {
221+
return deletionPacerInfo{
222+
freeBytes: 10 * GB,
223+
liveBytes: 10 * GB,
224+
}
225+
}
226+
227+
cm := openCleanupManager(opts, objProvider, getDeletePacerInfo)
228+
229+
x := 0
230+
addJob := func(fileSize int) {
231+
x++
232+
cm.EnqueueJob(1, []obsoleteFile{{
233+
fileType: base.FileTypeTable,
234+
fs: mem,
235+
path: fmt.Sprintf("test%02d.sst", x),
236+
fileNum: base.DiskFileNum(x),
237+
fileSize: uint64(fileSize),
238+
isLocal: true,
239+
}}, obsoleteObjectStats{})
240+
}
241+
242+
for range jobsQueueLowThreshold {
243+
addJob(1 * MB)
244+
}
245+
// At 1MB, each job will take 100ms each. Note that the rate increase based on
246+
// history won't make much difference, since the enqueued size is averaged
247+
// over 5 minutes.
248+
time.Sleep(50 * time.Millisecond)
249+
require.Greater(t, len(cm.jobsCh), jobsQueueLowThreshold/2)
250+
t.Logf("%d", len(cm.jobsCh))
251+
252+
// Add enough jobs to exceed the high threshold. We add small jobs so that the
253+
// historic rate doesn't grow significantly.
254+
require.Greater(t, jobsQueueDepth, jobsQueueHighThreshold+jobsQueueLowThreshold)
255+
t.Logf("B")
256+
for range jobsQueueHighThreshold {
257+
addJob(1)
258+
}
259+
260+
for i := 0; ; i++ {
261+
time.Sleep(10 * time.Millisecond)
262+
if len(cm.jobsCh) <= jobsQueueHighThreshold {
263+
break
264+
}
265+
if i == 1000 {
266+
t.Fatalf("jobs channel length never dropped below high threshold (%d vs %d)", len(cm.jobsCh), jobsQueueHighThreshold)
267+
}
268+
}
269+
// Set a high rate so the rest of the jobs finish quickly.
270+
rate.Store(1 * GB)
271+
cm.Close()
272+
}

0 commit comments

Comments
 (0)