Skip to content

Commit 3f21e7c

Browse files
committed
delete pacer: count in-queue bytes as obsolete bytes
Informs #5424
1 parent 65fe99f commit 3f21e7c

File tree

3 files changed

+65
-14
lines changed

3 files changed

+65
-14
lines changed

obsolete_files.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func (cm *cleanupManager) EnqueueJob(
150150
}
151151
}
152152
if pacingBytes > 0 {
153-
cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
153+
cm.deletePacer.DeletionEnqueued(crtime.NowMono(), pacingBytes)
154154
}
155155

156156
cm.mu.Lock()
@@ -216,6 +216,9 @@ func (cm *cleanupManager) deleteObsoleteFilesInJob(
216216
cm.maybePace(tb, &of, paceTimer)
217217
}
218218
cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
219+
if of.needsPacing() {
220+
cm.deletePacer.DeletionPerformed(of.fileSize)
221+
}
219222
default:
220223
cm.deleteObsoleteFile(of.fs, of.fileType, job.jobID, of.path, of.fileNum)
221224
}

pacer.go

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@ import (
99
"time"
1010

1111
"github.com/cockroachdb/crlib/crtime"
12+
"github.com/cockroachdb/pebble/internal/invariants"
1213
)
1314

1415
// deletionPacerInfo contains any info from the db necessary to make deletion
1516
// pacing decisions (to limit background IO usage so that it does not contend
1617
// with foreground traffic).
1718
type deletionPacerInfo struct {
18-
freeBytes uint64
19+
freeBytes uint64
20+
// obsoleteBytes is the total size of obsolete files in the latest version;
21+
// these are files that have not yet been enqueued for deletion.
1922
obsoleteBytes uint64
2023
liveBytes uint64
2124
}
@@ -44,6 +47,10 @@ type deletionPacer struct {
4447
// history keeps rack of recent deletion history; it used to increase the
4548
// deletion rate to match the pace of deletions.
4649
history history
50+
51+
// bytesToDelete is the sum of pacing bytes for all deletions that were
52+
// reported but not yet performed.
53+
bytesToDelete uint64
4754
}
4855

4956
targetByteDeletionRate func() int
@@ -83,17 +90,33 @@ func newDeletionPacer(
8390
return d
8491
}
8592

86-
// ReportDeletion is used to report a deletion to the pacer. The pacer uses it
87-
// to keep track of the recent rate of deletions and potentially increase the
88-
// deletion rate accordingly.
93+
// DeletionEnqueued is used to report a new deletion request to the pacer. The
94+
// pacer uses it to keep track of the recent rate of deletions and potentially
95+
// increase the deletion rate accordingly.
8996
//
90-
// ReportDeletion is thread-safe.
91-
func (p *deletionPacer) ReportDeletion(now crtime.Mono, bytesToDelete uint64) {
97+
// DeletionEnqueued is thread-safe.
98+
func (p *deletionPacer) DeletionEnqueued(now crtime.Mono, bytesToDelete uint64) {
9299
p.mu.Lock()
93100
defer p.mu.Unlock()
101+
p.mu.bytesToDelete += bytesToDelete
94102
p.mu.history.Add(now, int64(bytesToDelete))
95103
}
96104

105+
// DeletionPerformed is used to report that a deletion was performed. The pacer
106+
// uses it to keep track of how many bytes are in the queue.
107+
func (p *deletionPacer) DeletionPerformed(bytesToDelete uint64) {
108+
p.mu.Lock()
109+
defer p.mu.Unlock()
110+
if p.mu.bytesToDelete < bytesToDelete {
111+
if invariants.Enabled {
112+
panic("underflow")
113+
}
114+
p.mu.bytesToDelete = 0
115+
} else {
116+
p.mu.bytesToDelete -= bytesToDelete
117+
}
118+
}
119+
97120
// PacingDelay returns the recommended pacing wait time (in seconds) for
98121
// deleting the given number of bytes.
99122
//
@@ -108,10 +131,10 @@ func (p *deletionPacer) PacingDelay(now crtime.Mono, bytesToDelete uint64) (wait
108131
baseRate := float64(targetByteDeletionRate)
109132
// If recent deletion rate is more than our target, use that so that we don't
110133
// fall behind.
111-
historicRate := func() float64 {
134+
historicRate, totalBytesToDelete := func() (float64, uint64) {
112135
p.mu.Lock()
113136
defer p.mu.Unlock()
114-
return float64(p.mu.history.Sum(now)) / deletePacerHistory.Seconds()
137+
return float64(p.mu.history.Sum(now)) / deletePacerHistory.Seconds(), p.mu.bytesToDelete
115138
}()
116139
if historicRate > baseRate {
117140
baseRate = historicRate
@@ -128,7 +151,7 @@ func (p *deletionPacer) PacingDelay(now crtime.Mono, bytesToDelete uint64) (wait
128151
// We don't know the obsolete bytes ratio. Disable pacing altogether.
129152
return 0.0
130153
}
131-
obsoleteBytesRatio := float64(info.obsoleteBytes) / float64(info.liveBytes)
154+
obsoleteBytesRatio := float64(info.obsoleteBytes+totalBytesToDelete) / float64(info.liveBytes)
132155
if obsoleteBytesRatio >= p.obsoleteBytesMaxRatio {
133156
// Increase the rate so that we can free up enough bytes within the timeframe.
134157
r := (obsoleteBytesRatio - p.obsoleteBytesMaxRatio) * float64(info.liveBytes) / p.obsoleteBytesTimeframe.Seconds()

pacer_test.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ func TestDeletionPacer(t *testing.T) {
2929
// history of deletion reporting; first value in the pair is the time,
3030
// second value is the deleted bytes. The time of pacing is the same as the
3131
// last time in the history.
32-
history [][2]int64
32+
history [][2]int64
33+
bytesInQueue uint64
3334
// expected pacing rate in MB/s.
3435
expected float64
3536
}{
@@ -73,7 +74,7 @@ func TestDeletionPacer(t *testing.T) {
7374
{
7475
freeBytes: 160 * GB,
7576
obsoleteBytes: 1 * MB,
76-
liveBytes: 160 * MB,
77+
liveBytes: 1000 * GB,
7778
history: [][2]int64{{0, 5 * 60 * 200 * MB}},
7879
expected: 200.0,
7980
},
@@ -82,7 +83,7 @@ func TestDeletionPacer(t *testing.T) {
8283
{
8384
freeBytes: 6 * GB,
8485
obsoleteBytes: 1 * MB,
85-
liveBytes: 160 * MB,
86+
liveBytes: 100 * GB,
8687
history: [][2]int64{{0, 5 * 60 * 200 * MB}},
8788
expected: 1224.0,
8889
},
@@ -95,6 +96,27 @@ func TestDeletionPacer(t *testing.T) {
9596
history: [][2]int64{{0, 5 * 60 * 200 * MB}},
9697
expected: 302.4,
9798
},
99+
// History shows 200MB/sec deletions on average over last 5 minutes and
100+
// obsoleteBytesRatio is 50%, from bytes still in queue.
101+
{
102+
freeBytes: 500 * GB,
103+
obsoleteBytes: 0 * GB,
104+
liveBytes: 100 * GB,
105+
history: [][2]int64{{0, 5 * 60 * 200 * MB}},
106+
bytesInQueue: 50 * GB,
107+
expected: 302.4,
108+
},
109+
// History shows 200MB/sec deletions on average over last 5 minutes and
110+
// obsoleteBytesRatio is 50%, from a combination of obsoleteBytes and bytes
111+
// still in queue.
112+
{
113+
freeBytes: 500 * GB,
114+
obsoleteBytes: 25 * GB,
115+
liveBytes: 100 * GB,
116+
history: [][2]int64{{0, 5 * 60 * 200 * MB}},
117+
bytesInQueue: 25 * GB,
118+
expected: 302.4,
119+
},
98120
// History shows 1000MB/sec deletions on average over last 5 minutes.
99121
{
100122
freeBytes: 160 * GB,
@@ -134,10 +156,13 @@ func TestDeletionPacer(t *testing.T) {
134156
opts.ObsoleteBytesTimeframe,
135157
getInfo,
136158
)
159+
var inQueue uint64
137160
for _, h := range tc.history {
138161
last = start + crtime.Mono(time.Second*time.Duration(h[0]))
139-
pacer.ReportDeletion(last, uint64(h[1]))
162+
pacer.DeletionEnqueued(last, uint64(h[1]))
163+
inQueue += uint64(h[1])
140164
}
165+
pacer.DeletionPerformed(inQueue - tc.bytesInQueue)
141166
result := 1.0 / pacer.PacingDelay(last, 1*MB)
142167
require.InDelta(t, tc.expected, result, 1e-7)
143168
})

0 commit comments

Comments
 (0)