@@ -339,7 +339,6 @@ func newPickedCompactionFromL0(
339
339
340
340
pc := newPickedTableCompaction (opts , vers , l0Organizer , 0 , outputLevel , baseLevel )
341
341
pc .lcf = lcf
342
- pc .outputLevel .level = outputLevel
343
342
344
343
// Manually build the compaction as opposed to calling
345
344
// pickAutoHelper. This is because L0Sublevels has already added
@@ -416,11 +415,12 @@ func (pc *pickedTableCompaction) clone() *pickedTableCompaction {
416
415
// pc.outputLevel. It returns false if a concurrent compaction is occurring on the start or
417
416
// output level files. Note that inputLevel is not necessarily pc.startLevel. In multiLevel
418
417
// compactions, inputs are set by calling setupInputs once for each adjacent pair of levels.
419
- // This will preserve level invariants when expanding the compaction. pc.smallest and pc.largest
420
- // will be updated to reflect the key range of the inputs.
418
+ // This will preserve level invariants when expanding the compaction. pc.bounds will be updated
419
+ // to reflect the key range of the inputs.
421
420
func (pc * pickedTableCompaction ) setupInputs (
422
421
opts * Options ,
423
422
diskAvailBytes uint64 ,
423
+ inProgressCompactions []compactionInfo ,
424
424
inputLevel * compactionLevel ,
425
425
problemSpans * problemspans.ByLevel ,
426
426
) bool {
@@ -462,7 +462,8 @@ func (pc *pickedTableCompaction) setupInputs(
462
462
// If L0 is involved, it should always be the startLevel of the compaction.
463
463
pc .startLevel .l0SublevelInfo = generateSublevelInfo (cmp , pc .startLevel .files )
464
464
}
465
- return true
465
+
466
+ return ! outputKeyRangeAlreadyCompacting (cmp , inProgressCompactions , pc )
466
467
}
467
468
468
469
// grow grows the number of inputs at startLevel without changing the number of
@@ -586,17 +587,15 @@ func (pc *pickedTableCompaction) estimatedInputSize() uint64 {
586
587
587
588
// setupMultiLevelCandidate returns true if it successfully added another level
588
589
// to the compaction.
589
- func (pc * pickedTableCompaction ) setupMultiLevelCandidate (
590
- opts * Options , diskAvailBytes uint64 ,
591
- ) bool {
590
+ func (pc * pickedTableCompaction ) setupMultiLevelCandidate (opts * Options , env compactionEnv ) bool {
592
591
pc .inputs = append (pc .inputs , compactionLevel {level : pc .outputLevel .level + 1 })
593
592
594
593
// Recalibrate startLevel and outputLevel:
595
594
// - startLevel and outputLevel pointers may be obsolete after appending to pc.inputs.
596
595
// - push outputLevel to extraLevels and move the new level to outputLevel
597
596
pc .startLevel = & pc .inputs [0 ]
598
597
pc .outputLevel = & pc .inputs [2 ]
599
- return pc .setupInputs (opts , diskAvailBytes , & pc .inputs [1 ], nil /* TODO(radu) */ )
598
+ return pc .setupInputs (opts , env . diskAvailBytes , env . inProgressCompactions , & pc .inputs [1 ], nil /* TODO(radu) */ )
600
599
}
601
600
602
601
// canCompactTables returns true if the tables in the level slice are not
@@ -1398,9 +1397,7 @@ func (p *compactionPickerByScore) pickAutoScore(env compactionEnv) pickedCompact
1398
1397
1399
1398
if info .level == 0 {
1400
1399
ptc := pickL0 (env , p .opts , p .vers , p .latestVersionState .l0Organizer , p .baseLevel )
1401
- // Fail-safe to protect against compacting the same sstable
1402
- // concurrently.
1403
- if ptc != nil && ! inputRangeAlreadyCompacting (p .opts .Comparer .Compare , env , ptc ) {
1400
+ if ptc != nil {
1404
1401
p .addScoresToPickedCompactionMetrics (ptc , scores )
1405
1402
ptc .score = info .score
1406
1403
if false {
@@ -1419,8 +1416,7 @@ func (p *compactionPickerByScore) pickAutoScore(env compactionEnv) pickedCompact
1419
1416
}
1420
1417
1421
1418
pc := pickAutoLPositive (env , p .opts , p .vers , p .latestVersionState .l0Organizer , * info , p .baseLevel )
1422
- // Fail-safe to protect against compacting the same sstable concurrently.
1423
- if pc != nil && ! inputRangeAlreadyCompacting (p .opts .Comparer .Compare , env , pc ) {
1419
+ if pc != nil {
1424
1420
p .addScoresToPickedCompactionMetrics (pc , scores )
1425
1421
pc .score = info .score
1426
1422
if false {
@@ -1612,17 +1608,10 @@ func (p *compactionPickerByScore) pickedCompactionFromCandidateFile(
1612
1608
startLevel , outputLevel , p .baseLevel )
1613
1609
pc .kind = kind
1614
1610
pc .startLevel .files = inputs
1615
- pc .bounds = manifest .KeyRange (p .opts .Comparer .Compare , pc .startLevel .files .All ())
1616
1611
1617
- // Fail-safe to protect against compacting the same sstable concurrently.
1618
- if inputRangeAlreadyCompacting (p .opts .Comparer .Compare , env , pc ) {
1612
+ if ! pc .setupInputs (p .opts , env .diskAvailBytes , env .inProgressCompactions , pc .startLevel , env .problemSpans ) {
1619
1613
return nil
1620
1614
}
1621
-
1622
- if ! pc .setupInputs (p .opts , env .diskAvailBytes , pc .startLevel , env .problemSpans ) {
1623
- return nil
1624
- }
1625
-
1626
1615
return pc
1627
1616
}
1628
1617
@@ -1787,15 +1776,15 @@ func pickAutoLPositive(
1787
1776
}
1788
1777
pc .startLevel .files = cInfo .file .Slice ()
1789
1778
1790
- if ! pc .setupInputs (opts , env .diskAvailBytes , pc .startLevel , env .problemSpans ) {
1779
+ if ! pc .setupInputs (opts , env .diskAvailBytes , env . inProgressCompactions , pc .startLevel , env .problemSpans ) {
1791
1780
return nil
1792
1781
}
1793
- return pc .maybeAddLevel (opts , env . diskAvailBytes )
1782
+ return pc .maybeAddLevel (opts , env )
1794
1783
}
1795
1784
1796
1785
// maybeAddLevel maybe adds a level to the picked compaction.
1797
1786
func (pc * pickedTableCompaction ) maybeAddLevel (
1798
- opts * Options , diskAvailBytes uint64 ,
1787
+ opts * Options , env compactionEnv ,
1799
1788
) * pickedTableCompaction {
1800
1789
pc .pickerMetrics .singleLevelOverlappingRatio = pc .overlappingRatio ()
1801
1790
if pc .outputLevel .level == numLevels - 1 {
@@ -1806,17 +1795,17 @@ func (pc *pickedTableCompaction) maybeAddLevel(
1806
1795
return pc
1807
1796
}
1808
1797
targetFileSize := opts .TargetFileSize (pc .outputLevel .level , pc .baseLevel )
1809
- if pc .estimatedInputSize () > expandedCompactionByteSizeLimit (opts , targetFileSize , diskAvailBytes ) {
1798
+ if pc .estimatedInputSize () > expandedCompactionByteSizeLimit (opts , targetFileSize , env . diskAvailBytes ) {
1810
1799
// Don't add a level if the current compaction exceeds the compaction size limit
1811
1800
return pc
1812
1801
}
1813
- return opts .Experimental .MultiLevelCompactionHeuristic .pick (pc , opts , diskAvailBytes )
1802
+ return opts .Experimental .MultiLevelCompactionHeuristic .pick (pc , opts , env )
1814
1803
}
1815
1804
1816
1805
// MultiLevelHeuristic evaluates whether to add files from the next level into the compaction.
1817
1806
type MultiLevelHeuristic interface {
1818
1807
// Evaluate returns the preferred compaction.
1819
- pick (pc * pickedTableCompaction , opts * Options , diskAvailBytes uint64 ) * pickedTableCompaction
1808
+ pick (pc * pickedTableCompaction , opts * Options , env compactionEnv ) * pickedTableCompaction
1820
1809
1821
1810
// Returns if the heuristic allows L0 to be involved in ML compaction
1822
1811
allowL0 () bool
@@ -1831,7 +1820,7 @@ type NoMultiLevel struct{}
1831
1820
var _ MultiLevelHeuristic = (* NoMultiLevel )(nil )
1832
1821
1833
1822
func (nml NoMultiLevel ) pick (
1834
- pc * pickedTableCompaction , opts * Options , diskAvailBytes uint64 ,
1823
+ pc * pickedTableCompaction , opts * Options , env compactionEnv ,
1835
1824
) * pickedTableCompaction {
1836
1825
return pc
1837
1826
}
@@ -1887,17 +1876,17 @@ var _ MultiLevelHeuristic = (*WriteAmpHeuristic)(nil)
1887
1876
// in-progress flushes and compactions from completing, etc. Consider ways to
1888
1877
// deduplicate work, given that setupInputs has already been called.
1889
1878
func (wa WriteAmpHeuristic ) pick (
1890
- pcOrig * pickedTableCompaction , opts * Options , diskAvailBytes uint64 ,
1879
+ pcOrig * pickedTableCompaction , opts * Options , env compactionEnv ,
1891
1880
) * pickedTableCompaction {
1892
1881
pcMulti := pcOrig .clone ()
1893
- if ! pcMulti .setupMultiLevelCandidate (opts , diskAvailBytes ) {
1882
+ if ! pcMulti .setupMultiLevelCandidate (opts , env ) {
1894
1883
return pcOrig
1895
1884
}
1896
1885
// We consider the addition of a level as an "expansion" of the compaction.
1897
1886
// If pcMulti is past the expanded compaction byte size limit already,
1898
1887
// we don't consider it.
1899
1888
targetFileSize := opts .TargetFileSize (pcMulti .outputLevel .level , pcMulti .baseLevel )
1900
- if pcMulti .estimatedInputSize () >= expandedCompactionByteSizeLimit (opts , targetFileSize , diskAvailBytes ) {
1889
+ if pcMulti .estimatedInputSize () >= expandedCompactionByteSizeLimit (opts , targetFileSize , env . diskAvailBytes ) {
1901
1890
return pcOrig
1902
1891
}
1903
1892
picked := pcOrig
@@ -1937,11 +1926,11 @@ func pickL0(
1937
1926
lcf := l0Organizer .PickBaseCompaction (opts .Logger , 1 , vers .Levels [baseLevel ].Slice (), baseLevel , env .problemSpans )
1938
1927
if lcf != nil {
1939
1928
pc := newPickedCompactionFromL0 (lcf , opts , vers , l0Organizer , baseLevel , true )
1940
- if pc .setupInputs (opts , env .diskAvailBytes , pc .startLevel , env .problemSpans ) {
1929
+ if pc .setupInputs (opts , env .diskAvailBytes , env . inProgressCompactions , pc .startLevel , env .problemSpans ) {
1941
1930
if pc .startLevel .files .Empty () {
1942
1931
opts .Logger .Errorf ("%v" , base .AssertionFailedf ("empty compaction chosen" ))
1943
1932
}
1944
- return pc .maybeAddLevel (opts , env . diskAvailBytes )
1933
+ return pc .maybeAddLevel (opts , env )
1945
1934
}
1946
1935
// TODO(radu): investigate why this happens.
1947
1936
// opts.Logger.Errorf("%v", base.AssertionFailedf("setupInputs failed"))
@@ -1954,7 +1943,7 @@ func pickL0(
1954
1943
lcf = l0Organizer .PickIntraL0Compaction (env .earliestUnflushedSeqNum , minIntraL0Count , env .problemSpans )
1955
1944
if lcf != nil {
1956
1945
pc := newPickedCompactionFromL0 (lcf , opts , vers , l0Organizer , baseLevel , false )
1957
- if pc .setupInputs (opts , env .diskAvailBytes , pc .startLevel , env .problemSpans ) {
1946
+ if pc .setupInputs (opts , env .diskAvailBytes , env . inProgressCompactions , pc .startLevel , env .problemSpans ) {
1958
1947
if pc .startLevel .files .Empty () {
1959
1948
opts .Logger .Fatalf ("empty compaction chosen" )
1960
1949
}
@@ -2009,12 +1998,12 @@ func newPickedManualCompaction(
2009
1998
}
2010
1999
// We use nil problemSpans because we don't want problem spans to prevent
2011
2000
// manual compactions.
2012
- if ! pc .setupInputs (opts , env .diskAvailBytes , pc .startLevel , nil /* problemSpans */ ) {
2001
+ if ! pc .setupInputs (opts , env .diskAvailBytes , env . inProgressCompactions , pc .startLevel , nil /* problemSpans */ ) {
2013
2002
// setupInputs returned false indicating there's a conflicting
2014
2003
// concurrent compaction.
2015
2004
return nil , true
2016
2005
}
2017
- if pc = pc .maybeAddLevel (opts , env . diskAvailBytes ); pc == nil {
2006
+ if pc = pc .maybeAddLevel (opts , env ); pc == nil {
2018
2007
return nil , false
2019
2008
}
2020
2009
if pc .outputLevel .level != outputLevel {
@@ -2024,10 +2013,6 @@ func newPickedManualCompaction(
2024
2013
panic ("pebble: compaction picked unexpected output level" )
2025
2014
}
2026
2015
}
2027
- // Fail-safe to protect against compacting the same sstable concurrently.
2028
- if inputRangeAlreadyCompacting (opts .Comparer .Compare , env , pc ) {
2029
- return nil , true
2030
- }
2031
2016
return pc , false
2032
2017
}
2033
2018
@@ -2054,18 +2039,14 @@ func pickDownloadCompaction(
2054
2039
pc = newPickedTableCompaction (opts , vers , l0Organizer , level , level , baseLevel )
2055
2040
pc .kind = kind
2056
2041
pc .startLevel .files = manifest .NewLevelSliceKeySorted (opts .Comparer .Compare , []* manifest.TableMetadata {file })
2057
- if ! pc .setupInputs (opts , env .diskAvailBytes , pc .startLevel , nil /* problemSpans */ ) {
2042
+ if ! pc .setupInputs (opts , env .diskAvailBytes , env . inProgressCompactions , pc .startLevel , nil /* problemSpans */ ) {
2058
2043
// setupInputs returned false indicating there's a conflicting
2059
2044
// concurrent compaction.
2060
2045
return nil
2061
2046
}
2062
2047
if pc .outputLevel .level != level {
2063
2048
panic ("pebble: download compaction picked unexpected output level" )
2064
2049
}
2065
- // Fail-safe to protect against compacting the same sstable concurrently.
2066
- if inputRangeAlreadyCompacting (opts .Comparer .Compare , env , pc ) {
2067
- return nil
2068
- }
2069
2050
return pc
2070
2051
}
2071
2052
@@ -2106,10 +2087,7 @@ func pickReadTriggeredCompactionHelper(
2106
2087
rc .level , defaultOutputLevel (rc .level , p .baseLevel ), p .baseLevel )
2107
2088
2108
2089
pc .startLevel .files = overlapSlice
2109
- if ! pc .setupInputs (p .opts , env .diskAvailBytes , pc .startLevel , env .problemSpans ) {
2110
- return nil
2111
- }
2112
- if inputRangeAlreadyCompacting (p .opts .Comparer .Compare , env , pc ) {
2090
+ if ! pc .setupInputs (p .opts , env .diskAvailBytes , env .inProgressCompactions , pc .startLevel , env .problemSpans ) {
2113
2091
return nil
2114
2092
}
2115
2093
pc .kind = compactionKindRead
@@ -2134,17 +2112,11 @@ func (p *compactionPickerByScore) forceBaseLevel1() {
2134
2112
p .baseLevel = 1
2135
2113
}
2136
2114
2137
- func inputRangeAlreadyCompacting (
2138
- cmp base.Compare , env compactionEnv , pc * pickedTableCompaction ,
2115
+ // outputKeyRangeAlreadyCompacting checks if the input range of the picked
2116
+ // compaction is already being written to by an in-progress compaction.
2117
+ func outputKeyRangeAlreadyCompacting (
2118
+ cmp base.Compare , inProgressCompactions []compactionInfo , pc * pickedTableCompaction ,
2139
2119
) bool {
2140
- for _ , cl := range pc .inputs {
2141
- for f := range cl .files .All () {
2142
- if f .IsCompacting () {
2143
- return true
2144
- }
2145
- }
2146
- }
2147
-
2148
2120
// Look for active compactions outputting to the same region of the key
2149
2121
// space in the same output level. Two potential compactions may conflict
2150
2122
// without sharing input files if there are no files in the output level
@@ -2174,7 +2146,7 @@ func inputRangeAlreadyCompacting(
2174
2146
//
2175
2147
// * - currently compacting
2176
2148
if pc .outputLevel != nil && pc .outputLevel .level != 0 {
2177
- for _ , c := range env . inProgressCompactions {
2149
+ for _ , c := range inProgressCompactions {
2178
2150
if pc .outputLevel .level != c .outputLevel {
2179
2151
continue
2180
2152
}
0 commit comments