@@ -41,6 +41,8 @@ type cleanupManager struct {
41
41
jobsCh chan * cleanupJob
42
42
// waitGroup is used to wait for the background goroutine to exit.
43
43
waitGroup sync.WaitGroup
44
+ // stopCh is closed when the pacer is disabled after closing the cleanup manager.
45
+ stopCh chan struct {}
44
46
45
47
mu struct {
46
48
sync.Mutex
@@ -103,6 +105,7 @@ func openCleanupManager(
103
105
getDeletePacerInfo ,
104
106
),
105
107
jobsCh : make (chan * cleanupJob , jobsQueueDepth ),
108
+ stopCh : make (chan struct {}),
106
109
}
107
110
cm .mu .completedJobsCond .L = & cm .mu .Mutex
108
111
cm .waitGroup .Add (1 )
@@ -120,6 +123,7 @@ func openCleanupManager(
120
123
// Delete pacing is disabled for the remaining jobs.
121
124
func (cm * cleanupManager ) Close () {
122
125
close (cm .jobsCh )
126
+ close (cm .stopCh )
123
127
cm .waitGroup .Wait ()
124
128
}
125
129
@@ -174,21 +178,18 @@ func (cm *cleanupManager) Wait() {
174
178
func (cm * cleanupManager ) mainLoop () {
175
179
defer cm .waitGroup .Done ()
176
180
181
+ paceTimer := time .NewTimer (time .Duration (0 ))
182
+ defer paceTimer .Stop ()
183
+
177
184
var tb tokenbucket.TokenBucket
178
185
// Use a token bucket with 1 token / second refill rate and 1 token burst.
179
186
tb .Init (1.0 , 1.0 )
180
187
for job := range cm .jobsCh {
181
- for _ , of := range job .obsoleteFiles {
182
- switch of .fileType {
183
- case base .FileTypeTable :
184
- cm .maybePace (& tb , & of )
185
- cm .deleteObsoleteObject (of .fileType , job .jobID , of .fileNum )
186
- case base .FileTypeBlob :
187
- cm .maybePace (& tb , & of )
188
- cm .deleteObsoleteObject (of .fileType , job .jobID , of .fileNum )
189
- default :
190
- cm .deleteObsoleteFile (of .fs , of .fileType , job .jobID , of .path , of .fileNum )
191
- }
188
+ select {
189
+ case <- cm .stopCh :
190
+ cm .deleteObsoleteFilesInJob (job , nil , nil )
191
+ default :
192
+ cm .deleteObsoleteFilesInJob (job , & tb , paceTimer )
192
193
}
193
194
cm .mu .Lock ()
194
195
cm .mu .completedJobs ++
@@ -199,9 +200,31 @@ func (cm *cleanupManager) mainLoop() {
199
200
}
200
201
}
201
202
203
+ // deleteObsoleteFilesInJob deletes all obsolete files in the given job. If tb
204
+ // and paceTimer are provided, files that need pacing will be throttled
205
+ // according to the deletion rate. If tb is nil, files are deleted immediately
206
+ // without pacing (used when the cleanup manager is being closed).
207
+ func (cm * cleanupManager ) deleteObsoleteFilesInJob (
208
+ job * cleanupJob , tb * tokenbucket.TokenBucket , paceTimer * time.Timer ,
209
+ ) {
210
+ for _ , of := range job .obsoleteFiles {
211
+ switch of .fileType {
212
+ case base .FileTypeTable , base .FileTypeBlob :
213
+ if tb != nil {
214
+ cm .maybePace (tb , & of , paceTimer )
215
+ }
216
+ cm .deleteObsoleteObject (of .fileType , job .jobID , of .fileNum )
217
+ default :
218
+ cm .deleteObsoleteFile (of .fs , of .fileType , job .jobID , of .path , of .fileNum )
219
+ }
220
+ }
221
+ }
222
+
202
223
// maybePace sleeps before deleting an object if appropriate. It is always
203
224
// called from the background goroutine.
204
- func (cm * cleanupManager ) maybePace (tb * tokenbucket.TokenBucket , of * obsoleteFile ) {
225
+ func (cm * cleanupManager ) maybePace (
226
+ tb * tokenbucket.TokenBucket , of * obsoleteFile , paceTimer * time.Timer ,
227
+ ) {
205
228
if ! of .needsPacing () {
206
229
return
207
230
}
@@ -220,7 +243,12 @@ func (cm *cleanupManager) maybePace(tb *tokenbucket.TokenBucket, of *obsoleteFil
220
243
if ok {
221
244
break
222
245
}
223
- time .Sleep (d )
246
+ paceTimer .Reset (d )
247
+ select {
248
+ case <- paceTimer .C :
249
+ case <- cm .stopCh :
250
+ return
251
+ }
224
252
}
225
253
}
226
254
0 commit comments