Skip to content

Commit 2c8ccd6

Browse files
committed
db: fix L0Organizer race during logAndApply
In `logAndApply`, we `Apply` the bulk version edit without holding `DB.mu`. `Apply` internally updates `L0Organizer` which is supposed to be protected by `DB.mu`. Note that the same thing was happening before the `L0Organizer`, but because the incremental updating of sublevels doesn't work with compactions, we were always generating new `L0Sublevels`, so the previous version's sublevels were still usable. We separate the updating of L0 sublevels from `Apply` and split it into two steps so we can perform as much work as possible outside of DB.mu.
1 parent ae524f9 commit 2c8ccd6

File tree

11 files changed

+113
-65
lines changed

11 files changed

+113
-65
lines changed

internal/keyspan/keyspanimpl/level_iter_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,9 @@ func TestLevelIterEquivalence(t *testing.T) {
314314
b.AddedTables[6] = amap
315315
l0Organizer := manifest.NewL0Organizer(base.DefaultComparer, 0 /* flushSplitBytes */)
316316
emptyVersion := manifest.NewInitialVersion(base.DefaultComparer)
317-
v, err := b.Apply(emptyVersion, l0Organizer, 0)
317+
v, err := b.Apply(emptyVersion, 0)
318318
require.NoError(t, err)
319+
l0Organizer.PerformUpdate(l0Organizer.PrepareUpdate(b, v), v)
319320
levelIter.Init(
320321
context.Background(),
321322
keyspan.SpanIterOptions{}, base.DefaultComparer.Compare, tableNewIters,

internal/manifest/l0_sublevels.go

Lines changed: 83 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2073,6 +2073,7 @@ type L0Organizer struct {
20732073
cmp base.Compare
20742074
formatKey base.FormatKey
20752075
flushSplitBytes int64
2076+
generation int64
20762077

20772078
// levelMetadata is the current L0.
20782079
levelMetadata LevelMetadata
@@ -2108,68 +2109,108 @@ func NewL0Organizer(comparer *base.Comparer, flushSplitBytes int64) *L0Organizer
21082109
return o
21092110
}
21102111

2111-
// SublevelFiles returns the sublevels as LevelSlices. The returned value (both
2112-
// the slice and each LevelSlice) is immutable. The L0Organizer creates new
2113-
// slices every time L0 changes.
2114-
func (o *L0Organizer) SublevelFiles() []LevelSlice {
2115-
return o.l0Sublevels.Levels
2116-
}
2117-
2118-
// Update the L0 organizer with the given L0 changes.
2119-
func (o *L0Organizer) Update(
2120-
addedL0Tables map[base.FileNum]*TableMetadata,
2121-
deletedL0Tables map[base.FileNum]*TableMetadata,
2122-
newLevelMeta *LevelMetadata,
2123-
) {
2112+
// PrepareUpdate is the first step in the two-step process to update the
2113+
// L0Organizer. This first step performs as much work as it can without
2114+
// modifying the L0Organizer.
2115+
//
2116+
// This method can be called concurrently with other methods (other than
2117+
// PerformUpdate). It allows doing most of the update work outside an important
2118+
// lock.
2119+
func (o *L0Organizer) PrepareUpdate(bve *BulkVersionEdit, newVersion *Version) L0PreparedUpdate {
2120+
addedL0Tables := bve.AddedTables[0]
2121+
deletedL0Tables := bve.DeletedTables[0]
2122+
newLevelMeta := &newVersion.Levels[0]
21242123
if invariants.Enabled && invariants.Sometimes(10) {
21252124
// Verify that newLevelMeta = m.levelMetadata + addedL0Tables - deletedL0Tables.
21262125
verifyLevelMetadataTransition(&o.levelMetadata, newLevelMeta, addedL0Tables, deletedL0Tables)
21272126
}
2128-
o.levelMetadata = *newLevelMeta
2127+
21292128
if len(addedL0Tables) == 0 && len(deletedL0Tables) == 0 {
2130-
return
2129+
return L0PreparedUpdate{
2130+
generation: o.generation,
2131+
newSublevels: o.l0Sublevels,
2132+
}
21312133
}
2132-
// If we only added tables, try to use addL0Files.
2134+
21332135
if len(deletedL0Tables) == 0 {
21342136
if files, ok := o.l0Sublevels.canUseAddL0Files(addedL0Tables, newLevelMeta); ok {
2135-
newSublevels := o.l0Sublevels.addL0Files(files, o.flushSplitBytes, newLevelMeta)
2136-
// In invariants mode, sometimes rebuild from scratch to verify that
2137-
// AddL0Files did the right thing. Note that NewL0Sublevels updates
2138-
// fields in TableMetadata like L0Index, so we don't want to do this
2139-
// every time.
2140-
if invariants.Enabled && invariants.Sometimes(10) {
2141-
expectedSublevels, err := newL0Sublevels(newLevelMeta, o.cmp, o.formatKey, o.flushSplitBytes)
2142-
if err != nil {
2143-
panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
2144-
}
2145-
s1 := describeSublevels(o.formatKey, false /* verbose */, expectedSublevels.Levels)
2146-
s2 := describeSublevels(o.formatKey, false /* verbose */, newSublevels.Levels)
2147-
if s1 != s2 {
2148-
// Add verbosity.
2149-
s1 := describeSublevels(o.formatKey, true /* verbose */, expectedSublevels.Levels)
2150-
s2 := describeSublevels(o.formatKey, true /* verbose */, newSublevels.Levels)
2151-
panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
2152-
}
2137+
return L0PreparedUpdate{
2138+
generation: o.generation,
2139+
addL0Files: files,
21532140
}
2154-
o.l0Sublevels = newSublevels
2155-
return
21562141
}
21572142
}
2158-
var err error
2159-
o.l0Sublevels, err = newL0Sublevels(newLevelMeta, o.cmp, o.formatKey, o.flushSplitBytes)
2143+
newSublevels, err := newL0Sublevels(newLevelMeta, o.cmp, o.formatKey, o.flushSplitBytes)
21602144
if err != nil {
21612145
panic(errors.AssertionFailedf("error generating L0Sublevels: %s", err))
21622146
}
2147+
2148+
return L0PreparedUpdate{
2149+
generation: o.generation,
2150+
newSublevels: newSublevels,
2151+
}
2152+
}
2153+
2154+
// L0PreparedUpdate is returned by L0Organizer.PrepareUpdate(), to be passed to
2155+
// PerformUpdate().
2156+
type L0PreparedUpdate struct {
2157+
generation int64
2158+
2159+
// Exactly one of the following fields will be set.
2160+
addL0Files []*TableMetadata
2161+
newSublevels *l0Sublevels
2162+
}
2163+
2164+
// PerformUpdate applies an update the L0 organizer which was previously
2165+
// prepared using PrepareUpdate.
2166+
//
2167+
// Sets newVersion.L0SublevelFiles (which is immutable once set).
2168+
//
2169+
// This method cannot be called concurrently with any other methods.
2170+
func (o *L0Organizer) PerformUpdate(prepared L0PreparedUpdate, newVersion *Version) {
2171+
if prepared.generation != o.generation {
2172+
panic("invalid L0 update generation")
2173+
}
2174+
o.levelMetadata = newVersion.Levels[0]
2175+
o.generation++
2176+
if prepared.addL0Files != nil {
2177+
newSublevels := o.l0Sublevels.addL0Files(prepared.addL0Files, o.flushSplitBytes, &o.levelMetadata)
2178+
// In invariants mode, sometimes rebuild from scratch to verify that
2179+
// AddL0Files did the right thing. Note that NewL0Sublevels updates
2180+
// fields in TableMetadata like L0Index, so we don't want to do this
2181+
// every time.
2182+
if invariants.Enabled && invariants.Sometimes(10) {
2183+
expectedSublevels, err := newL0Sublevels(&o.levelMetadata, o.cmp, o.formatKey, o.flushSplitBytes)
2184+
if err != nil {
2185+
panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
2186+
}
2187+
s1 := describeSublevels(o.formatKey, false /* verbose */, expectedSublevels.Levels)
2188+
s2 := describeSublevels(o.formatKey, false /* verbose */, newSublevels.Levels)
2189+
if s1 != s2 {
2190+
// Add verbosity.
2191+
s1 := describeSublevels(o.formatKey, true /* verbose */, expectedSublevels.Levels)
2192+
s2 := describeSublevels(o.formatKey, true /* verbose */, newSublevels.Levels)
2193+
panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
2194+
}
2195+
}
2196+
o.l0Sublevels = newSublevels
2197+
} else {
2198+
o.l0Sublevels = prepared.newSublevels
2199+
}
2200+
newVersion.L0SublevelFiles = o.l0Sublevels.Levels
21632201
}
21642202

2165-
// ResetForTesting reinitializes the L0Organizer to reflect a given L0 level.
2166-
func (o *L0Organizer) ResetForTesting(levelMetadata *LevelMetadata) {
2167-
o.levelMetadata = *levelMetadata
2203+
// ResetForTesting reinitializes the L0Organizer to reflect the given version.
2204+
// Sets v.L0SublevelFiles.
2205+
func (o *L0Organizer) ResetForTesting(v *Version) {
2206+
o.levelMetadata = v.Levels[0]
2207+
o.generation = 0
21682208
var err error
2169-
o.l0Sublevels, err = newL0Sublevels(levelMetadata, o.cmp, o.formatKey, o.flushSplitBytes)
2209+
o.l0Sublevels, err = newL0Sublevels(&v.Levels[0], o.cmp, o.formatKey, o.flushSplitBytes)
21702210
if err != nil {
21712211
panic(errors.AssertionFailedf("error generating L0Sublevels: %s", err))
21722212
}
2213+
v.L0SublevelFiles = o.l0Sublevels.Levels
21732214
}
21742215

21752216
// verifyLevelMetadataTransition verifies that newLevel matches oldLevel after

internal/manifest/l0_sublevels_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ func readManifest(filename string) (*Version, error) {
5454
if err := bve.Accumulate(&ve); err != nil {
5555
return nil, err
5656
}
57-
if v, err = bve.Apply(v, l0Organizer, 32000); err != nil {
57+
if v, err = bve.Apply(v, 32000); err != nil {
5858
return nil, err
5959
}
60+
l0Organizer.PerformUpdate(l0Organizer.PrepareUpdate(&bve, v), v)
6061
}
6162
return v, nil
6263
}

internal/manifest/manifest_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ func replayManifest(
128128
}
129129
l0Organizer := manifest.NewL0Organizer(cmp, opts.FlushSplitBytes)
130130
emptyVersion := manifest.NewInitialVersion(cmp)
131-
v, err := bve.Apply(emptyVersion, l0Organizer, opts.Experimental.ReadCompactionRate)
131+
v, err := bve.Apply(emptyVersion, opts.Experimental.ReadCompactionRate)
132132
require.NoError(t, err)
133+
l0Organizer.PerformUpdate(l0Organizer.PrepareUpdate(&bve, v), v)
133134
return v, l0Organizer
134135
}

internal/manifest/version.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,8 +1214,7 @@ func NewVersionForTesting(
12141214
v.Levels[l].totalSize += f.Size
12151215
}
12161216
}
1217-
l0Organizer.ResetForTesting(&v.Levels[0])
1218-
v.L0SublevelFiles = l0Organizer.SublevelFiles()
1217+
l0Organizer.ResetForTesting(v)
12191218
return v
12201219
}
12211220

internal/manifest/version_edit.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,9 +1149,10 @@ func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
11491149
// the levels.
11501150
//
11511151
// curr may be nil, which is equivalent to a pointer to a zero version.
1152-
func (b *BulkVersionEdit) Apply(
1153-
curr *Version, l0Organizer *L0Organizer, readCompactionRate int64,
1154-
) (*Version, error) {
1152+
//
1153+
// Not that L0SublevelFiles is not initialized in the returned version; it is
1154+
// the caller's responsibility to set it using L0Organizer.PerformUpdate().
1155+
func (b *BulkVersionEdit) Apply(curr *Version, readCompactionRate int64) (*Version, error) {
11551156
comparer := curr.cmp
11561157
v := &Version{
11571158
cmp: comparer,
@@ -1293,9 +1294,6 @@ func (b *BulkVersionEdit) Apply(
12931294
}
12941295
}
12951296

1296-
l0Organizer.Update(b.AddedTables[0], b.DeletedTables[0], &v.Levels[0])
1297-
v.L0SublevelFiles = l0Organizer.SublevelFiles()
1298-
12991297
// We maintain stats about active references in blob files and can infer
13001298
// when a blob file has become a 'zombie,' and is no longer referenced in
13011299
// the resulting version. However, we expect the caller to explicitly list

internal/manifest/version_edit_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,10 +449,11 @@ func TestVersionEditApply(t *testing.T) {
449449
if l0Organizer == nil {
450450
d.Fatalf(t, "no L0 organizer")
451451
}
452-
newv, err := bve.Apply(v, l0Organizer, readCompactionRate)
452+
newv, err := bve.Apply(v, readCompactionRate)
453453
if err != nil {
454454
return err.Error()
455455
}
456+
l0Organizer.PerformUpdate(l0Organizer.PrepareUpdate(&bve, newv), newv)
456457
if saveName != "" {
457458
versions[saveName] = newv
458459
l0Organizers[saveName] = l0Organizer
@@ -461,7 +462,7 @@ func TestVersionEditApply(t *testing.T) {
461462
// Reinitialize the L0 organizer in case we want to use the same version
462463
// again (l0Organizer now reflects newv).
463464
l0Organizer = NewL0Organizer(base.DefaultComparer, flushSplitBytes)
464-
l0Organizer.ResetForTesting(&v.Levels[0])
465+
l0Organizer.ResetForTesting(v)
465466
l0Organizers[name] = l0Organizer
466467

467468
return newv.DebugString()

replay/replay.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -802,10 +802,11 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
802802
}
803803

804804
// Apply the edit.
805-
v, err = bve.Apply(v, l0Organizer, r.Opts.Experimental.ReadCompactionRate)
805+
v, err = bve.Apply(v, r.Opts.Experimental.ReadCompactionRate)
806806
if err != nil {
807807
return err
808808
}
809+
l0Organizer.PerformUpdate(l0Organizer.PrepareUpdate(&bve, v), v)
809810
// AddedTablesByFileNum maps file number to table metadata for all added
810811
// sstables from accumulated version edits so we must retain it.
811812
bve = manifest.BulkVersionEdit{AddedTablesByFileNum: bve.AddedTablesByFileNum}

tool/db.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -723,10 +723,11 @@ func (d *dbT) runProperties(cmd *cobra.Command, args []string) {
723723
}
724724
l0Organizer := manifest.NewL0Organizer(cmp, d.opts.FlushSplitBytes)
725725
emptyVersion := manifest.NewInitialVersion(cmp)
726-
v, err := bve.Apply(emptyVersion, l0Organizer, d.opts.Experimental.ReadCompactionRate)
726+
v, err := bve.Apply(emptyVersion, d.opts.Experimental.ReadCompactionRate)
727727
if err != nil {
728728
return err
729729
}
730+
l0Organizer.PerformUpdate(l0Organizer.PrepareUpdate(&bve, v), v)
730731

731732
objProvider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(d.opts.FS, dirname))
732733
if err != nil {

tool/manifest.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,11 +243,12 @@ func (m *manifestT) runDump(cmd *cobra.Command, args []string) {
243243
if comparer != nil {
244244
l0Organizer := manifest.NewL0Organizer(comparer, 0 /* flushSplitBytes */)
245245
emptyVersion := manifest.NewInitialVersion(comparer)
246-
v, err := bve.Apply(emptyVersion, l0Organizer, m.opts.Experimental.ReadCompactionRate)
246+
v, err := bve.Apply(emptyVersion, m.opts.Experimental.ReadCompactionRate)
247247
if err != nil {
248248
fmt.Fprintf(stdout, "%s\n", err)
249249
return
250250
}
251+
l0Organizer.PerformUpdate(l0Organizer.PrepareUpdate(&bve, v), v)
251252
m.printLevels(comparer.Compare, stdout, v)
252253
}
253254
}()
@@ -628,7 +629,7 @@ func (m *manifestT) runCheck(cmd *cobra.Command, args []string) {
628629
l0Organizer = manifest.NewL0Organizer(cmp, 0 /* flushSplitBytes */)
629630
v = manifest.NewInitialVersion(cmp)
630631
}
631-
newv, err := bve.Apply(v, l0Organizer, m.opts.Experimental.ReadCompactionRate)
632+
newv, err := bve.Apply(v, m.opts.Experimental.ReadCompactionRate)
632633
if err != nil {
633634
fmt.Fprintf(stdout, "%s: offset: %d err: %s\n",
634635
arg, offset, err)
@@ -648,6 +649,7 @@ func (m *manifestT) runCheck(cmd *cobra.Command, args []string) {
648649
ok = false
649650
break
650651
}
652+
l0Organizer.PerformUpdate(l0Organizer.PrepareUpdate(&bve, newv), newv)
651653
v = newv
652654
}
653655
}()

0 commit comments

Comments
 (0)