@@ -369,6 +369,10 @@ func (c *compaction) userKeyBounds() base.UserKeyBounds {
369
369
370
370
type getValueSeparation func (JobID , * compaction , sstable.TableFormat ) compact.ValueSeparation
371
371
372
+ // newCompaction constructs a compaction from the provided picked compaction.
373
+ //
374
+ // The compaction is created with a reference to its version that must be
375
+ // released when the compaction is complete.
372
376
func newCompaction (
373
377
pc * pickedCompaction ,
374
378
opts * Options ,
@@ -397,6 +401,21 @@ func newCompaction(
397
401
grantHandle : grantHandle ,
398
402
tableFormat : tableFormat ,
399
403
}
404
+ // Acquire a reference to the version to ensure that files and in-memory
405
+ // version state necessary for reading files remain available. Ignoring
406
+ // excises, this isn't strictly necessary for reading the sstables that are
407
+ // inputs to the compaction because those files are 'marked as compacting'
408
+ // and shouldn't be subject to any competing compactions. However with
409
+ // excises, a concurrent excise may remove a compaction's file from the
410
+ // Version and then cancel the compaction. The file shouldn't be physically
411
+ // removed until the cancelled compaction stops reading it.
412
+ //
413
+ // Additionally, we need any blob files referenced by input sstables to
414
+ // remain available, even if the blob file is rewritten. Maintaining a
415
+ // reference ensures that all these files remain available for the
416
+ // compaction's reads.
417
+ c .version .Ref ()
418
+
400
419
c .startLevel = & c .inputs [0 ]
401
420
if pc .startLevel .l0SublevelInfo != nil {
402
421
c .startLevel .l0SublevelInfo = pc .startLevel .l0SublevelInfo
@@ -501,6 +520,11 @@ func (c *compaction) maybeSwitchToMoveOrCopy(
501
520
}
502
521
}
503
522
523
+ // newDeleteOnlyCompaction constructs a delete-only compaction from the provided
524
+ // inputs.
525
+ //
526
+ // The compaction is created with a reference to its version that must be
527
+ // released when the compaction is complete.
504
528
func newDeleteOnlyCompaction (
505
529
opts * Options ,
506
530
cur * manifest.Version ,
@@ -523,6 +547,20 @@ func newDeleteOnlyCompaction(
523
547
exciseEnabled : exciseEnabled ,
524
548
grantHandle : noopGrantHandle {},
525
549
}
550
+ // Acquire a reference to the version to ensure that files and in-memory
551
+ // version state necessary for reading files remain available. Ignoring
552
+ // excises, this isn't strictly necessary for reading the sstables that are
553
+ // inputs to the compaction because those files are 'marked as compacting'
554
+ // and shouldn't be subject to any competing compactions. However with
555
+ // excises, a concurrent excise may remove a compaction's file from the
556
+ // Version and then cancel the compaction. The file shouldn't be physically
557
+ // removed until the cancelled compaction stops reading it.
558
+ //
559
+ // Additionally, we need any blob files referenced by input sstables to
560
+ // remain available, even if the blob file is rewritten. Maintaining a
561
+ // reference ensures that all these files remain available for the
562
+ // compaction's reads.
563
+ c .version .Ref ()
526
564
527
565
// Set c.smallest, c.largest.
528
566
files := make ([]iter.Seq [* manifest.TableMetadata ], 0 , len (inputs ))
@@ -600,6 +638,17 @@ func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64)
600
638
}
601
639
}
602
640
641
+ // newFlush creates the state necessary for a flush (modeled with the compaction
642
+ // struct).
643
+ //
644
+ // newFlush takes the current Version in order to populate grandparent flushing
645
+ // limits, but it does not reference the version.
646
+ //
647
+ // TODO(jackson): Consider maintaining a reference to the version anyways since
648
+ // in the future in-memory Version state may only be available while a Version
649
+ // is referenced (eg, if we start recycling B-Tree nodes once they're no longer
650
+ // referenced). There's subtlety around unref'ing the version at the right
651
+ // moment, so we defer it for now.
603
652
func newFlush (
604
653
opts * Options ,
605
654
cur * manifest.Version ,
@@ -1341,7 +1390,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
1341
1390
1342
1391
// Finding the target level for ingestion must use the latest version
1343
1392
// after the logLock has been acquired.
1344
- c . version = d .mu .versions .currentVersion ()
1393
+ version : = d .mu .versions .currentVersion ()
1345
1394
1346
1395
baseLevel := d .mu .versions .picker .getBaseLevel ()
1347
1396
ve := & manifest.VersionEdit {}
@@ -1380,7 +1429,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
1380
1429
logger : d .opts .Logger ,
1381
1430
Category : categoryIngest ,
1382
1431
},
1383
- v : c . version ,
1432
+ v : version ,
1384
1433
}
1385
1434
replacedTables := make (map [base.TableNum ][]manifest.NewTableEntry )
1386
1435
for _ , file := range ingestFlushable .files {
@@ -1426,7 +1475,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
1426
1475
if ingestFlushable .exciseSpan .Valid () {
1427
1476
exciseBounds := ingestFlushable .exciseSpan .UserKeyBounds ()
1428
1477
// Iterate through all levels and find files that intersect with exciseSpan.
1429
- for layer , ls := range c . version .AllLevelsAndSublevels () {
1478
+ for layer , ls := range version .AllLevelsAndSublevels () {
1430
1479
for m := range ls .Overlaps (d .cmp , ingestFlushable .exciseSpan .UserKeyBounds ()).All () {
1431
1480
leftTable , rightTable , err := d .exciseTable (context .TODO (), exciseBounds , m , layer .Level (), tightExciseBounds )
1432
1481
if err != nil {
@@ -2406,23 +2455,43 @@ func (d *DB) compactionPprofLabels(c *compaction) pprof.LabelSet {
2406
2455
// compact runs one compaction and maybe schedules another call to compact.
2407
2456
func (d * DB ) compact (c * compaction , errChannel chan error ) {
2408
2457
pprof .Do (context .Background (), d .compactionPprofLabels (c ), func (context.Context ) {
2409
- d .mu .Lock ()
2410
- c .grantHandle .Started ()
2411
- if err := d .compact1 (c , errChannel ); err != nil {
2412
- d .handleCompactFailure (c , err )
2413
- }
2414
- if c .isDownload {
2415
- d .mu .compact .downloadingCount --
2416
- } else {
2417
- d .mu .compact .compactingCount --
2418
- }
2419
- delete (d .mu .compact .inProgress , c )
2420
- // Add this compaction's duration to the cumulative duration. NB: This
2421
- // must be atomic with the above removal of c from
2422
- // d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
2423
- // miss or double count a completing compaction's duration.
2424
- d .mu .compact .duration += d .timeNow ().Sub (c .beganAt )
2425
- d .mu .Unlock ()
2458
+ func () {
2459
+ d .mu .Lock ()
2460
+ defer d .mu .Unlock ()
2461
+ jobID := d .newJobIDLocked ()
2462
+
2463
+ c .grantHandle .Started ()
2464
+ compactErr := d .compact1 (jobID , c )
2465
+ // The version stored in the compaction is ref'd when the
2466
+ // compaction is created. We're responsible for un-refing it
2467
+ // when the compaction is complete.
2468
+ //
2469
+ // Unreferencing the version may have accumulated obsolete
2470
+ // files, so we schedule a deletion of obsolete files.
2471
+ c .version .UnrefLocked ()
2472
+ d .deleteObsoleteFiles (jobID )
2473
+ // We send on the error channel only after we've deleted
2474
+ // obsolete files so that tests performing manual compactions
2475
+ // block until the obsolete files are deleted, and the test
2476
+ // observes the deletion.
2477
+ if errChannel != nil {
2478
+ errChannel <- compactErr
2479
+ }
2480
+ if compactErr != nil {
2481
+ d .handleCompactFailure (c , compactErr )
2482
+ }
2483
+ if c .isDownload {
2484
+ d .mu .compact .downloadingCount --
2485
+ } else {
2486
+ d .mu .compact .compactingCount --
2487
+ }
2488
+ delete (d .mu .compact .inProgress , c )
2489
+ // Add this compaction's duration to the cumulative duration. NB: This
2490
+ // must be atomic with the above removal of c from
2491
+ // d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
2492
+ // miss or double count a completing compaction's duration.
2493
+ d .mu .compact .duration += d .timeNow ().Sub (c .beganAt )
2494
+ }()
2426
2495
// Done must not be called while holding any lock that needs to be
2427
2496
// acquired by Schedule. Also, it must be called after new Version has
2428
2497
// been installed, and metadata related to compactingCount and inProgress
@@ -2440,10 +2509,12 @@ func (d *DB) compact(c *compaction, errChannel chan error) {
2440
2509
// scheduled, so we also need to call maybeScheduleCompaction. And
2441
2510
// maybeScheduleCompaction encompasses all compactions, and not only those
2442
2511
// scheduled via the CompactionScheduler.
2443
- d .mu .Lock ()
2444
- d .maybeScheduleCompaction ()
2445
- d .mu .compact .cond .Broadcast ()
2446
- d .mu .Unlock ()
2512
+ func () {
2513
+ d .mu .Lock ()
2514
+ defer d .mu .Unlock ()
2515
+ d .maybeScheduleCompaction ()
2516
+ d .mu .compact .cond .Broadcast ()
2517
+ }()
2447
2518
})
2448
2519
}
2449
2520
@@ -2538,14 +2609,7 @@ func (d *DB) cleanupVersionEdit(ve *manifest.VersionEdit) {
2538
2609
//
2539
2610
// d.mu must be held when calling this, but the mutex may be dropped and
2540
2611
// re-acquired during the course of this method.
2541
- func (d * DB ) compact1 (c * compaction , errChannel chan error ) (err error ) {
2542
- if errChannel != nil {
2543
- defer func () {
2544
- errChannel <- err
2545
- }()
2546
- }
2547
-
2548
- jobID := d .newJobIDLocked ()
2612
+ func (d * DB ) compact1 (jobID JobID , c * compaction ) (err error ) {
2549
2613
info := c .makeInfo (jobID )
2550
2614
d .opts .EventListener .CompactionBegin (info )
2551
2615
startTime := d .timeNow ()
@@ -2610,7 +2674,6 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
2610
2674
d .updateReadStateLocked (d .opts .DebugCheck )
2611
2675
d .updateTableStatsLocked (ve .NewTables )
2612
2676
}
2613
- d .deleteObsoleteFiles (jobID )
2614
2677
2615
2678
return err
2616
2679
}
@@ -2689,13 +2752,6 @@ func (d *DB) runCopyCompaction(
2689
2752
newMeta .InitPhysicalBacking ()
2690
2753
}
2691
2754
2692
- // Before dropping the db mutex, grab a ref to the current version. This
2693
- // prevents any concurrent excises from deleting files that this compaction
2694
- // needs to read/maintain a reference to.
2695
- vers := d .mu .versions .currentVersion ()
2696
- vers .Ref ()
2697
- defer vers .UnrefLocked ()
2698
-
2699
2755
// NB: The order here is reversed, lock after unlock. This is similar to
2700
2756
// runCompaction.
2701
2757
d .mu .Unlock ()
@@ -3051,15 +3107,6 @@ func (d *DB) runCompaction(
3051
3107
}
3052
3108
switch c .kind {
3053
3109
case compactionKindDeleteOnly :
3054
- // Before dropping the db mutex, grab a ref to the current version. This
3055
- // prevents any concurrent excises from deleting files that this compaction
3056
- // needs to read/maintain a reference to.
3057
- //
3058
- // Note that delete-only compactions can call excise(), which needs to be able
3059
- // to read these files.
3060
- vers := d .mu .versions .currentVersion ()
3061
- vers .Ref ()
3062
- defer vers .UnrefLocked ()
3063
3110
// Release the d.mu lock while doing I/O.
3064
3111
// Note the unusual order: Unlock and then Lock.
3065
3112
snapshots := d .mu .snapshots .toSlice ()
@@ -3076,18 +3123,6 @@ func (d *DB) runCompaction(
3076
3123
3077
3124
snapshots := d .mu .snapshots .toSlice ()
3078
3125
3079
- if c .flushing == nil {
3080
- // Before dropping the db mutex, grab a ref to the current version. This
3081
- // prevents any concurrent excises from deleting files that this compaction
3082
- // needs to read/maintain a reference to.
3083
- //
3084
- // Note that unlike user iterators, compactionIter does not maintain a ref
3085
- // of the version or read state.
3086
- vers := d .mu .versions .currentVersion ()
3087
- vers .Ref ()
3088
- defer vers .UnrefLocked ()
3089
- }
3090
-
3091
3126
// Release the d.mu lock while doing I/O.
3092
3127
// Note the unusual order: Unlock and then Lock.
3093
3128
d .mu .Unlock ()
0 commit comments