Skip to content

Commit 02cd5b3

Browse files
committed
manifest: add L0Organizer
We introduce `L0Organizer` type which contains the logic for updating `L0Sublevels`. A single `L0Organizer` per store keeps track of the current L0 state (corresponding to the latest `Version`).
1 parent df05e35 commit 02cd5b3

22 files changed

+269
-124
lines changed

compaction_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ import (
4141
)
4242

4343
func newVersion(opts *Options, files [numLevels][]*tableMetadata) *version {
44+
l0Organizer := manifest.NewL0Organizer(opts.Comparer, opts.FlushSplitBytes)
4445
v := manifest.NewVersionForTesting(
4546
opts.Comparer,
46-
opts.FlushSplitBytes,
47+
l0Organizer,
4748
files)
4849
if err := v.CheckOrdering(); err != nil {
4950
panic(err)

download_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ func TestDownloadCursor(t *testing.T) {
3333
case "define":
3434
var err error
3535
const flushSplitBytes = 10 * 1024 * 1024
36-
vers, err = manifest.ParseVersionDebug(base.DefaultComparer, flushSplitBytes, td.Input)
36+
l0Organizer := manifest.NewL0Organizer(base.DefaultComparer, flushSplitBytes)
37+
vers, err = manifest.ParseVersionDebug(base.DefaultComparer, l0Organizer, td.Input)
3738
if err != nil {
3839
td.Fatalf(t, "%v", err)
3940
}
@@ -118,7 +119,8 @@ func TestDownloadTask(t *testing.T) {
118119
case "define":
119120
var err error
120121
const flushSplitBytes = 10 * 1024 * 1024
121-
vers, err = manifest.ParseVersionDebug(base.DefaultComparer, flushSplitBytes, td.Input)
122+
l0Organizer := manifest.NewL0Organizer(base.DefaultComparer, flushSplitBytes)
123+
vers, err = manifest.ParseVersionDebug(base.DefaultComparer, l0Organizer, td.Input)
122124
if err != nil {
123125
td.Fatalf(t, "%v", err)
124126
}

get_iter_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,8 @@ func TestGetIter(t *testing.T) {
429429

430430
files[tt.level] = append(files[tt.level], meta)
431431
}
432-
v := manifest.NewVersionForTesting(cmp, 10<<20, files)
432+
l0Organizer := manifest.NewL0Organizer(cmp, 10<<20 /*flushSplitBytes*/)
433+
v := manifest.NewVersionForTesting(cmp, l0Organizer, files)
433434
err := v.CheckOrdering()
434435
if tc.badOrdering && err == nil {
435436
t.Errorf("desc=%q: want bad ordering, got nil error", desc)

internal/compact/run_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ func TestTableSplitLimit(t *testing.T) {
2323
case "define":
2424
var flushSplitBytes int64
2525
d.MaybeScanArgs(t, "flush-split-bytes", &flushSplitBytes)
26-
v = testutils.CheckErr(manifest.ParseVersionDebug(base.DefaultComparer, flushSplitBytes, d.Input))
26+
l0Organizer := manifest.NewL0Organizer(base.DefaultComparer, flushSplitBytes)
27+
v = testutils.CheckErr(manifest.ParseVersionDebug(base.DefaultComparer, l0Organizer, d.Input))
2728
buf.WriteString(v.String())
2829
if v.Levels[0].Len() != 0 {
2930
buf.WriteString("flush split keys:\n")

internal/compact/splitting_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ func TestOutputSplitter(t *testing.T) {
3434
files[1] = append(files[1], f)
3535
}
3636
}
37-
v := manifest.NewVersionForTesting(base.DefaultComparer, 64*1024, files)
37+
l0Organizer := manifest.NewL0Organizer(base.DefaultComparer, 64*1024 /* flushSplitBytes */)
38+
v := manifest.NewVersionForTesting(base.DefaultComparer, l0Organizer, files)
3839
if err := v.CheckOrdering(); err != nil {
3940
d.Fatalf(t, "%v", err)
4041
}

internal/compact/tombstone_elision_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ func TestSetupTombstoneElision(t *testing.T) {
8989
switch td.Cmd {
9090
case "define":
9191
var err error
92-
v, err = manifest.ParseVersionDebug(base.DefaultComparer, 64*1024, td.Input)
92+
l0Organizer := manifest.NewL0Organizer(base.DefaultComparer, 64*1024 /* flushSplitBytes */)
93+
v, err = manifest.ParseVersionDebug(base.DefaultComparer, l0Organizer, td.Input)
9394
if err != nil {
9495
td.Fatalf(t, "%v", err)
9596
}
@@ -129,7 +130,8 @@ func TestTombstoneElision(t *testing.T) {
129130
switch td.Cmd {
130131
case "define":
131132
var err error
132-
v, err = manifest.ParseVersionDebug(base.DefaultComparer, 64*1024, td.Input)
133+
l0Organizer := manifest.NewL0Organizer(base.DefaultComparer, 64*1024 /* flushSplitBytes */)
134+
v, err = manifest.ParseVersionDebug(base.DefaultComparer, l0Organizer, td.Input)
133135
if err != nil {
134136
td.Fatalf(t, "%v", err)
135137
}

internal/keyspan/keyspanimpl/level_iter_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,9 @@ func TestLevelIterEquivalence(t *testing.T) {
312312
amap[metas[i].FileNum] = metas[i]
313313
}
314314
b.AddedTables[6] = amap
315-
v, err := b.Apply(manifest.NewVersion(base.DefaultComparer), 0, 0)
315+
l0Organizer := manifest.NewL0Organizer(base.DefaultComparer, 0 /* flushSplitBytes */)
316+
emptyVersion := manifest.NewInitialVersion(base.DefaultComparer, l0Organizer)
317+
v, err := b.Apply(emptyVersion, l0Organizer, 0)
316318
require.NoError(t, err)
317319
levelIter.Init(
318320
context.Background(),

internal/manifest/annotator_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ func makeTestVersion(numFiles int) (*Version, []*TableMetadata) {
2626
var levelFiles [7][]*TableMetadata
2727
levelFiles[6] = files
2828

29-
v := NewVersionForTesting(base.DefaultComparer, 0, levelFiles)
29+
l0Organizer := NewL0Organizer(base.DefaultComparer, 0 /* flushSplitBytes */)
30+
v := NewVersionForTesting(base.DefaultComparer, l0Organizer, levelFiles)
3031
return v, files
3132
}
3233

internal/manifest/l0_sublevels.go

Lines changed: 164 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ type L0Sublevels struct {
249249

250250
fileBytes uint64
251251
// All the L0 files, ordered from oldest to youngest.
252-
levelMetadata *LevelMetadata
252+
levelMetadata LevelMetadata
253253

254254
// The file intervals in increasing key order.
255255
orderedIntervals []fileInterval
@@ -281,7 +281,7 @@ func NewL0Sublevels(
281281
levelMetadata *LevelMetadata, cmp Compare, formatKey base.FormatKey, flushSplitMaxBytes int64,
282282
) (*L0Sublevels, error) {
283283
s := &L0Sublevels{cmp: cmp, formatKey: formatKey}
284-
s.levelMetadata = levelMetadata
284+
s.levelMetadata = *levelMetadata
285285
keys := make([]intervalKeyTemp, 0, 2*s.levelMetadata.Len())
286286
iter := levelMetadata.Iter()
287287
for i, f := 0, iter.First(); f != nil; i, f = i+1, iter.Next() {
@@ -480,7 +480,7 @@ func (s *L0Sublevels) AddL0Files(
480480
*newVal = *s
481481

482482
newVal.addL0FilesCalled = false
483-
newVal.levelMetadata = levelMetadata
483+
newVal.levelMetadata = *levelMetadata
484484
// Deep copy levelFiles and Levels, as they are mutated and sorted below.
485485
// Shallow copies of slices that we just append to, are okay.
486486
newVal.levelFiles = make([][]*TableMetadata, len(s.levelFiles))
@@ -2073,3 +2073,164 @@ func (s *L0Sublevels) extendCandidateToRectangle(
20732073
}
20742074
return addedCount > 0
20752075
}
2076+
2077+
// L0Organizer keeps track of L0 state, including the subdivision into
2078+
// sublevels.
2079+
//
2080+
// It is designed to be used as a singleton (per store) which gets updated as
2081+
// the version changes. It is used to initialize L0-related Version fields.
2082+
type L0Organizer struct {
2083+
cmp base.Compare
2084+
formatKey base.FormatKey
2085+
flushSplitBytes int64
2086+
2087+
// levelMetadata is the current L0.
2088+
levelMetadata LevelMetadata
2089+
2090+
// sublevels reflects the current L0.
2091+
sublevels *L0Sublevels
2092+
}
2093+
2094+
// NewL0Organizer creates the L0 organizer. The L0 organizer is responsible for
2095+
// maintaining the current L0 state and is kept in-sync with the current Version.
2096+
//
2097+
// flushSplitBytes denotes the target number of bytes per sublevel in each flush
2098+
// split interval (i.e. range between two flush split keys) in L0 sstables. When
2099+
// set to zero, only a single sstable is generated by each flush. When set to a
2100+
// non-zero value, flushes are split at points to meet L0's TargetFileSize, any
2101+
// grandparent-related overlap options, and at boundary keys of L0 flush split
2102+
// intervals (which are targeted to contain around FlushSplitBytes bytes in each
2103+
// sublevel between pairs of boundary keys). Splitting sstables during flush
2104+
// allows increased compaction flexibility and concurrency when those tables are
2105+
// compacted to lower levels.
2106+
func NewL0Organizer(comparer *base.Comparer, flushSplitBytes int64) *L0Organizer {
2107+
o := &L0Organizer{
2108+
cmp: comparer.Compare,
2109+
formatKey: comparer.FormatKey,
2110+
flushSplitBytes: flushSplitBytes,
2111+
levelMetadata: MakeLevelMetadata(comparer.Compare, 0, nil),
2112+
}
2113+
var err error
2114+
o.sublevels, err = NewL0Sublevels(&o.levelMetadata, o.cmp, o.formatKey, o.flushSplitBytes)
2115+
if err != nil {
2116+
panic(errors.AssertionFailedf("error generating empty L0Sublevels: %s", err))
2117+
}
2118+
return o
2119+
}
2120+
2121+
// Sublevels returns the *L0Sublevels reflecting the current L0 state.
2122+
func (o *L0Organizer) Sublevels() *L0Sublevels {
2123+
return o.sublevels
2124+
}
2125+
2126+
// Update the L0 organizer with the given L0 changes.
2127+
func (o *L0Organizer) Update(
2128+
addedL0Tables map[base.FileNum]*TableMetadata,
2129+
deletedL0Tables map[base.FileNum]*TableMetadata,
2130+
newLevelMeta *LevelMetadata,
2131+
) {
2132+
if invariants.Enabled && invariants.Sometimes(10) {
2133+
// Verify that newLevelMeta = m.levelMetadata + addedL0Tables - deletedL0Tables.
2134+
verifyLevelMetadataTransition(&o.levelMetadata, newLevelMeta, addedL0Tables, deletedL0Tables)
2135+
}
2136+
o.levelMetadata = *newLevelMeta
2137+
if len(addedL0Tables) == 0 && len(deletedL0Tables) == 0 {
2138+
return
2139+
}
2140+
// If we only added tables, try to use AddL0Files.
2141+
if len(deletedL0Tables) == 0 {
2142+
// Construct the file slice needed by AddL0Files.
2143+
// TODO(radu): change AddL0Files to do this internally.
2144+
files := make([]*TableMetadata, 0, len(addedL0Tables))
2145+
iter := newLevelMeta.Iter()
2146+
for t := iter.Last(); len(files) < len(addedL0Tables); t = iter.Prev() {
2147+
if t == nil || addedL0Tables[t.FileNum] == nil {
2148+
break
2149+
}
2150+
files = append(files, t)
2151+
}
2152+
if len(files) == len(addedL0Tables) {
2153+
slices.Reverse(files)
2154+
newSublevels, err := o.sublevels.AddL0Files(files, o.flushSplitBytes, newLevelMeta)
2155+
if err == nil {
2156+
// In invariants mode, sometimes rebuild from scratch to verify that
2157+
// AddL0Files did the right thing. Note that NewL0Sublevels updates
2158+
// fields in TableMetadata like L0Index, so we don't want to do this
2159+
// every time.
2160+
if invariants.Enabled && invariants.Sometimes(10) {
2161+
expectedSublevels, err := NewL0Sublevels(newLevelMeta, o.cmp, o.formatKey, o.flushSplitBytes)
2162+
if err != nil {
2163+
panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
2164+
}
2165+
s1 := describeSublevels(o.formatKey, false /* verbose */, expectedSublevels.Levels)
2166+
s2 := describeSublevels(o.formatKey, false /* verbose */, newSublevels.Levels)
2167+
if s1 != s2 {
2168+
// Add verbosity.
2169+
s1 := describeSublevels(o.formatKey, true /* verbose */, expectedSublevels.Levels)
2170+
s2 := describeSublevels(o.formatKey, true /* verbose */, newSublevels.Levels)
2171+
panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
2172+
}
2173+
}
2174+
o.sublevels = newSublevels
2175+
return
2176+
}
2177+
if !errors.Is(err, errInvalidL0SublevelsOpt) {
2178+
panic(errors.AssertionFailedf("error generating L0Sublevels: %s", err))
2179+
}
2180+
}
2181+
}
2182+
var err error
2183+
o.sublevels, err = NewL0Sublevels(newLevelMeta, o.cmp, o.formatKey, o.flushSplitBytes)
2184+
if err != nil {
2185+
panic(errors.AssertionFailedf("error generating L0Sublevels: %s", err))
2186+
}
2187+
}
2188+
2189+
// Reset the L0Organizer to reflect a given L0 level. Used for testing.
2190+
func (o *L0Organizer) Reset(levelMetadata *LevelMetadata) {
2191+
o.levelMetadata = *levelMetadata
2192+
var err error
2193+
o.sublevels, err = NewL0Sublevels(levelMetadata, o.cmp, o.formatKey, o.flushSplitBytes)
2194+
if err != nil {
2195+
panic(errors.AssertionFailedf("error generating L0Sublevels: %s", err))
2196+
}
2197+
}
2198+
2199+
// verifyLevelMetadataTransition verifies that newLevel matches oldLevel after
2200+
// adding and removing the specified tables.
2201+
func verifyLevelMetadataTransition(
2202+
oldLevel, newLevel *LevelMetadata,
2203+
addedTables map[base.FileNum]*TableMetadata,
2204+
deletedTables map[base.FileNum]*TableMetadata,
2205+
) {
2206+
m := make(map[base.FileNum]*TableMetadata, oldLevel.Len())
2207+
iter := oldLevel.Iter()
2208+
for t := iter.First(); t != nil; t = iter.Next() {
2209+
m[t.FileNum] = t
2210+
}
2211+
for n, t := range addedTables {
2212+
if m[n] != nil {
2213+
panic("added table that already exists in old level")
2214+
}
2215+
m[n] = t
2216+
}
2217+
for n, t := range deletedTables {
2218+
if m[n] == nil {
2219+
panic("deleted table not in old level")
2220+
}
2221+
if m[n] != t {
2222+
panic("deleted table does not match old level")
2223+
}
2224+
delete(m, n)
2225+
}
2226+
iter = newLevel.Iter()
2227+
for t := iter.First(); t != nil; t = iter.Next() {
2228+
if m[t.FileNum] == nil {
2229+
panic("unknown table in new level")
2230+
}
2231+
delete(m, t.FileNum)
2232+
}
2233+
if len(m) != 0 {
2234+
panic("tables missing from the new level")
2235+
}
2236+
}

internal/manifest/l0_sublevels_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ func readManifest(filename string) (*Version, error) {
3232
}
3333
defer f.Close()
3434
rr := record.NewReader(f, 0 /* logNum */)
35-
v := NewVersion(base.DefaultComparer)
35+
36+
l0Organizer := NewL0Organizer(base.DefaultComparer, 10<<20 /* flushSplitBytes */)
37+
v := NewInitialVersion(base.DefaultComparer, l0Organizer)
3638
addedByFileNum := make(map[base.FileNum]*TableMetadata)
3739
for {
3840
r, err := rr.Next()
@@ -51,7 +53,7 @@ func readManifest(filename string) (*Version, error) {
5153
if err := bve.Accumulate(&ve); err != nil {
5254
return nil, err
5355
}
54-
if v, err = bve.Apply(v, 10<<20, 32000); err != nil {
56+
if v, err = bve.Apply(v, l0Organizer, 32000); err != nil {
5557
return nil, err
5658
}
5759
}
@@ -318,7 +320,7 @@ func TestL0Sublevels(t *testing.T) {
318320
levelFiles: explicitSublevels,
319321
cmp: base.DefaultComparer.Compare,
320322
formatKey: base.DefaultFormatter,
321-
levelMetadata: &levelMetadata,
323+
levelMetadata: levelMetadata,
322324
}
323325
for _, files := range explicitSublevels {
324326
sublevels.Levels = append(sublevels.Levels, NewLevelSliceSpecificOrder(files))

0 commit comments

Comments
 (0)