@@ -33,10 +33,9 @@ type DeleteCleaner = base.DeleteCleaner
33
33
type ArchiveCleaner = base.ArchiveCleaner
34
34
35
35
type cleanupManager struct {
36
- opts * Options
37
- objProvider objstorage.Provider
38
- onTableDeleteFn func (fileSize uint64 , isLocal bool )
39
- deletePacer * deletionPacer
36
+ opts * Options
37
+ objProvider objstorage.Provider
38
+ deletePacer * deletionPacer
40
39
41
40
// jobsCh is used as the cleanup job queue.
42
41
jobsCh chan * cleanupJob
@@ -47,12 +46,21 @@ type cleanupManager struct {
47
46
sync.Mutex
48
47
// totalJobs is the total number of enqueued jobs (completed or in progress).
49
48
totalJobs int
49
+ completedStats obsoleteObjectStats
50
50
completedJobs int
51
51
completedJobsCond sync.Cond
52
52
jobsQueueWarningIssued bool
53
53
}
54
54
}
55
55
56
+ // CompletedStats returns the stats summarizing objects deleted. The returned
57
+ // stats increase monotonically over the lifetime of the DB.
58
+ func (m * cleanupManager ) CompletedStats () obsoleteObjectStats {
59
+ m .mu .Lock ()
60
+ defer m .mu .Unlock ()
61
+ return m .mu .completedStats
62
+ }
63
+
56
64
// We can queue this many jobs before we have to block EnqueueJob.
57
65
const jobsQueueDepth = 1000
58
66
@@ -74,20 +82,17 @@ func (of *obsoleteFile) needsPacing() bool {
74
82
type cleanupJob struct {
75
83
jobID JobID
76
84
obsoleteFiles []obsoleteFile
85
+ stats obsoleteObjectStats
77
86
}
78
87
79
88
// openCleanupManager creates a cleanupManager and starts its background goroutine.
80
89
// The cleanupManager must be Close()d.
81
90
func openCleanupManager (
82
- opts * Options ,
83
- objProvider objstorage.Provider ,
84
- onTableDeleteFn func (fileSize uint64 , isLocal bool ),
85
- getDeletePacerInfo func () deletionPacerInfo ,
91
+ opts * Options , objProvider objstorage.Provider , getDeletePacerInfo func () deletionPacerInfo ,
86
92
) * cleanupManager {
87
93
cm := & cleanupManager {
88
- opts : opts ,
89
- objProvider : objProvider ,
90
- onTableDeleteFn : onTableDeleteFn ,
94
+ opts : opts ,
95
+ objProvider : objProvider ,
91
96
deletePacer : newDeletionPacer (
92
97
crtime .NowMono (),
93
98
opts .FreeSpaceThresholdBytes ,
@@ -119,10 +124,13 @@ func (cm *cleanupManager) Close() {
119
124
}
120
125
121
126
// EnqueueJob adds a cleanup job to the manager's queue.
122
- func (cm * cleanupManager ) EnqueueJob (jobID JobID , obsoleteFiles []obsoleteFile ) {
127
+ func (cm * cleanupManager ) EnqueueJob (
128
+ jobID JobID , obsoleteFiles []obsoleteFile , stats obsoleteObjectStats ,
129
+ ) {
123
130
job := & cleanupJob {
124
131
jobID : jobID ,
125
132
obsoleteFiles : obsoleteFiles ,
133
+ stats : stats ,
126
134
}
127
135
128
136
// Report deleted bytes to the pacer, which can use this data to potentially
@@ -174,7 +182,6 @@ func (cm *cleanupManager) mainLoop() {
174
182
switch of .fileType {
175
183
case base .FileTypeTable :
176
184
cm .maybePace (& tb , & of )
177
- cm .onTableDeleteFn (of .fileSize , of .isLocal )
178
185
cm .deleteObsoleteObject (of .fileType , job .jobID , of .fileNum )
179
186
case base .FileTypeBlob :
180
187
cm .maybePace (& tb , & of )
@@ -185,6 +192,7 @@ func (cm *cleanupManager) mainLoop() {
185
192
}
186
193
cm .mu .Lock ()
187
194
cm .mu .completedJobs ++
195
+ cm .mu .completedStats .Add (job .stats )
188
196
cm .mu .completedJobsCond .Broadcast ()
189
197
cm .maybeLogLocked ()
190
198
cm .mu .Unlock ()
@@ -320,18 +328,6 @@ func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
320
328
return pacerInfo
321
329
}
322
330
323
- // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
324
- func (d * DB ) onObsoleteTableDelete (fileSize uint64 , isLocal bool ) {
325
- d .mu .Lock ()
326
- d .mu .versions .metrics .Table .ObsoleteCount --
327
- d .mu .versions .metrics .Table .ObsoleteSize -= fileSize
328
- if isLocal {
329
- d .mu .versions .metrics .Table .Local .ObsoleteCount --
330
- d .mu .versions .metrics .Table .Local .ObsoleteSize -= fileSize
331
- }
332
- d .mu .Unlock ()
333
- }
334
-
335
331
// scanObsoleteFiles scans the filesystem for files that are no longer needed
336
332
// and adds those to the internal lists of obsolete files. Note that the files
337
333
// are not actually deleted by this method. A subsequent call to
@@ -448,7 +444,7 @@ func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlusha
448
444
//
449
445
// d.mu must be held when calling this method.
450
446
func (d * DB ) disableFileDeletions () {
451
- d .mu .disableFileDeletions ++
447
+ d .mu .fileDeletions . disableCount ++
452
448
d .mu .Unlock ()
453
449
defer d .mu .Lock ()
454
450
d .cleanupManager .Wait ()
@@ -459,11 +455,11 @@ func (d *DB) disableFileDeletions() {
459
455
//
460
456
// d.mu must be held when calling this method.
461
457
func (d * DB ) enableFileDeletions () {
462
- if d .mu .disableFileDeletions <= 0 {
458
+ if d .mu .fileDeletions . disableCount <= 0 {
463
459
panic ("pebble: file deletion disablement invariant violated" )
464
460
}
465
- d .mu .disableFileDeletions --
466
- if d .mu .disableFileDeletions > 0 {
461
+ d .mu .fileDeletions . disableCount --
462
+ if d .mu .fileDeletions . disableCount > 0 {
467
463
return
468
464
}
469
465
d .deleteObsoleteFiles (d .newJobIDLocked ())
@@ -478,7 +474,7 @@ type fileInfo = base.FileInfo
478
474
// Does nothing if file deletions are disabled (see disableFileDeletions). A
479
475
// cleanup job will be scheduled when file deletions are re-enabled.
480
476
func (d * DB ) deleteObsoleteFiles (jobID JobID ) {
481
- if d .mu .disableFileDeletions > 0 {
477
+ if d .mu .fileDeletions . disableCount > 0 {
482
478
return
483
479
}
484
480
_ , noRecycle := d .opts .Cleaner .(base.NeedsFileContents )
@@ -524,6 +520,16 @@ func (d *DB) deleteObsoleteFiles(jobID JobID) {
524
520
obsoleteOptions := d .mu .versions .obsoleteOptions
525
521
d .mu .versions .obsoleteOptions = nil
526
522
523
+ // Compute the stats for the files being queued for deletion and add them to
524
+ // the running total. These stats will be used during DB.Metrics() to
525
+ // calculate the count and size of pending obsolete files by diffing these
526
+ // stats and the stats reported by the cleanup manager.
527
+ var objectStats obsoleteObjectStats
528
+ objectStats .tablesAll , objectStats .tablesLocal = calculateObsoleteObjectStats (obsoleteTables )
529
+ objectStats .blobFilesAll , objectStats .blobFilesLocal = calculateObsoleteObjectStats (obsoleteBlobs )
530
+ d .mu .fileDeletions .queuedStats .Add (objectStats )
531
+ d .mu .versions .updateObsoleteObjectMetricsLocked ()
532
+
527
533
// Release d.mu while preparing the cleanup job and possibly waiting.
528
534
// Note the unusual order: Unlock and then Lock.
529
535
d .mu .Unlock ()
@@ -552,7 +558,7 @@ func (d *DB) deleteObsoleteFiles(jobID JobID) {
552
558
d .fileCache .Evict (f .fileNum , base .FileTypeBlob )
553
559
}
554
560
if len (filesToDelete ) > 0 {
555
- d .cleanupManager .EnqueueJob (jobID , filesToDelete )
561
+ d .cleanupManager .EnqueueJob (jobID , filesToDelete , objectStats )
556
562
}
557
563
if d .opts .private .testingAlwaysWaitForCleanup {
558
564
d .cleanupManager .Wait ()
@@ -601,6 +607,54 @@ func (o objectInfo) asObsoleteFile(fs vfs.FS, fileType base.FileType, dirname st
601
607
}
602
608
}
603
609
610
+ func calculateObsoleteObjectStats (files []obsoleteFile ) (total , local countAndSize ) {
611
+ for _ , of := range files {
612
+ if of .isLocal {
613
+ local .count ++
614
+ local .size += of .fileSize
615
+ }
616
+ total .count ++
617
+ total .size += of .fileSize
618
+ }
619
+ return total , local
620
+ }
621
+
622
+ type obsoleteObjectStats struct {
623
+ tablesLocal countAndSize
624
+ tablesAll countAndSize
625
+ blobFilesLocal countAndSize
626
+ blobFilesAll countAndSize
627
+ }
628
+
629
+ func (s * obsoleteObjectStats ) Add (other obsoleteObjectStats ) {
630
+ s .tablesLocal .Add (other .tablesLocal )
631
+ s .tablesAll .Add (other .tablesAll )
632
+ s .blobFilesLocal .Add (other .blobFilesLocal )
633
+ s .blobFilesAll .Add (other .blobFilesAll )
634
+ }
635
+
636
+ func (s * obsoleteObjectStats ) Sub (other obsoleteObjectStats ) {
637
+ s .tablesLocal .Sub (other .tablesLocal )
638
+ s .tablesAll .Sub (other .tablesAll )
639
+ s .blobFilesLocal .Sub (other .blobFilesLocal )
640
+ s .blobFilesAll .Sub (other .blobFilesAll )
641
+ }
642
+
643
+ type countAndSize struct {
644
+ count uint64
645
+ size uint64
646
+ }
647
+
648
+ func (c * countAndSize ) Add (other countAndSize ) {
649
+ c .count += other .count
650
+ c .size += other .size
651
+ }
652
+
653
+ func (c * countAndSize ) Sub (other countAndSize ) {
654
+ c .count = invariants .SafeSub (c .count , other .count )
655
+ c .size = invariants .SafeSub (c .size , other .size )
656
+ }
657
+
604
658
func makeZombieObjects () zombieObjects {
605
659
return zombieObjects {
606
660
objs : make (map [base.DiskFileNum ]objectInfo ),
0 commit comments