Skip to content

Commit bbfc14b

Browse files
committed
sstable: reimplement lazy load the index block in single level iterator
This commit reimplements lazy loading for single-level sstable iterators, fixing critical issues from the previous attempt (527eebf) which was reverted due to data corruption in stress tests. This reimplementation maintains the same I/O optimization benefits as the original attempt while ensuring data integrity through robust state management and error handling. While the behavior is transparent to users, the internal semantic is different now. The order of events in some data-driven tests have to change accordingly. See changes in `checkpoint`, `cleaner`, and `event_listener`. There are also tests added to demonstrate how bloom filter can benefit from lazy loading by avoid loading index block entirely. See changes in `flushable_ingest`, and `reader_lazy_loading`. Implements #3248
1 parent a76bd2e commit bbfc14b

File tree

10 files changed

+1078
-50
lines changed

10 files changed

+1078
-50
lines changed

sstable/data_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ func runIterCmd(
360360
}
361361
fmt.Fprintf(&b, "| topLevelIndex.isDataInvalidated()=%t\n", twoLevelIter.topLevelIndex.IsDataInvalidated())
362362
}
363-
if si.index.Valid() {
363+
if si != nil && si.index.Valid() {
364364
fmt.Fprintf(&b, "| index.Separator() = %q\n", si.index.Separator())
365365
bhp, err := si.index.BlockHandleWithProperties()
366366
if err != nil {
@@ -372,12 +372,14 @@ func runIterCmd(
372372
} else {
373373
fmt.Fprintf(&b, "| index iter invalid\n")
374374
}
375-
fmt.Fprintf(&b, "| index.isDataInvalidated()=%t\n", si.index.IsDataInvalidated())
376-
fmt.Fprintf(&b, "| data.isDataInvalidated()=%t\n", si.data.IsDataInvalidated())
377-
fmt.Fprintf(&b, "| hideObsoletePoints = %t\n", si.transforms.HideObsoletePoints)
378-
fmt.Fprintf(&b, "| dataBH = (Offset: %d, Length: %d)\n", si.dataBH.Offset, si.dataBH.Length)
379-
fmt.Fprintf(&b, "| (boundsCmp,positionedUsingLatestBounds) = (%d,%t)\n", si.boundsCmp, si.positionedUsingLatestBounds)
380-
fmt.Fprintf(&b, "| exhaustedBounds = %d\n", si.exhaustedBounds)
375+
if si != nil {
376+
fmt.Fprintf(&b, "| index.isDataInvalidated()=%t\n", si.index.IsDataInvalidated())
377+
fmt.Fprintf(&b, "| data.isDataInvalidated()=%t\n", si.data.IsDataInvalidated())
378+
fmt.Fprintf(&b, "| hideObsoletePoints = %t\n", si.transforms.HideObsoletePoints)
379+
fmt.Fprintf(&b, "| dataBH = (Offset: %d, Length: %d)\n", si.dataBH.Offset, si.dataBH.Length)
380+
fmt.Fprintf(&b, "| (boundsCmp,positionedUsingLatestBounds) = (%d,%t)\n", si.boundsCmp, si.positionedUsingLatestBounds)
381+
fmt.Fprintf(&b, "| exhaustedBounds = %d\n", si.exhaustedBounds)
382+
}
381383

382384
continue
383385
}

sstable/reader_iter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ var (
151151
singleLevelIterRowBlockPool sync.Pool // *singleLevelIteratorRowBlocks
152152
twoLevelIterRowBlockPool sync.Pool // *twoLevelIteratorRowBlocks
153153
singleLevelIterColumnBlockPool sync.Pool // *singleLevelIteratorColumnBlocks
154-
twoLevelIterColumnBlockPool sync.Pool // *singleLevelIteratorColumnBlocks
154+
twoLevelIterColumnBlockPool sync.Pool // *twoLevelIteratorColumnBlocks
155155
)
156156

157157
func init() {

sstable/reader_iter_single_lvl.go

Lines changed: 66 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ type singleLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIte
165165
useFilterBlock bool
166166
lastBloomFilterMatched bool
167167

168+
// indexLoaded is set to true if the index block load operation completed
169+
// successfully.
170+
indexLoaded bool
171+
168172
transforms IterTransforms
169173

170174
// All fields above this field are cleared when resetting the iterator for reuse.
@@ -191,9 +195,8 @@ type singleLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIte
191195
// singleLevelIterator implements the base.InternalIterator interface.
192196
var _ base.InternalIterator = (*singleLevelIteratorRowBlocks)(nil)
193197

194-
// newColumnBlockSingleLevelIterator reads the index block and creates and
195-
// initializes a singleLevelIterator over an sstable with column-oriented data
196-
// blocks.
198+
// newColumnBlockSingleLevelIterator creates a singleLevelIterator over an
199+
// sstable with column-oriented data blocks that loads the index block lazily.
197200
//
198201
// Note that lower, upper are iterator bounds and are separate from virtual
199202
// sstable bounds. If the virtualState passed in is not nil, then virtual
@@ -215,20 +218,12 @@ func newColumnBlockSingleLevelIterator(
215218
i.vbRH = r.blockReader.UsePreallocatedReadHandle(objstorage.NoReadBefore, &i.vbRHPrealloc)
216219
}
217220
i.data.InitOnce(r.keySchema, r.Comparer, &i.internalValueConstructor)
218-
indexH, err := r.readTopLevelIndexBlock(ctx, i.readEnv.Block, i.indexFilterRH)
219-
if err == nil {
220-
err = i.index.InitHandle(r.Comparer, indexH, opts.Transforms)
221-
}
222-
if err != nil {
223-
_ = i.Close()
224-
return nil, err
225-
}
221+
226222
return i, nil
227223
}
228224

229-
// newRowBlockSingleLevelIterator reads the index block and creates and
230-
// initializes a singleLevelIterator over an sstable with row-oriented data
231-
// blocks.
225+
// newRowBlockSingleLevelIterator creates a singleLevelIterator over an
226+
// sstable with row-oriented data blocks that loads the index block lazily.
232227
//
233228
// Note that lower, upper are iterator bounds and are separate from virtual
234229
// sstable bounds. If the virtualState passed in is not nil, then virtual
@@ -256,20 +251,13 @@ func newRowBlockSingleLevelIterator(
256251
i.data.SetHasValuePrefix(true)
257252
}
258253

259-
indexH, err := r.readTopLevelIndexBlock(ctx, i.readEnv.Block, i.indexFilterRH)
260-
if err == nil {
261-
err = i.index.InitHandle(r.Comparer, indexH, opts.Transforms)
262-
}
263-
if err != nil {
264-
_ = i.Close()
265-
return nil, err
266-
}
267254
return i, nil
268255
}
269256

270257
// init initializes the singleLevelIterator struct. It does not read the index.
271258
func (i *singleLevelIterator[I, PI, D, PD]) init(ctx context.Context, r *Reader, opts IterOptions) {
272259
i.inPool = false
260+
i.indexLoaded = false
273261
i.ctx = ctx
274262
i.lower = opts.Lower
275263
i.upper = opts.Upper
@@ -316,7 +304,7 @@ func (i *singleLevelIterator[I, PI, D, PD]) SetupForCompaction() {
316304

317305
const clearLen = unsafe.Offsetof(singleLevelIteratorRowBlocks{}.clearForResetBoundary)
318306

319-
// Assert that clearLen is consistent betwen the row and columnar implementations.
307+
// Assert that clearLen is consistent between the row and columnar implementations.
320308
const clearLenColBlocks = unsafe.Offsetof(singleLevelIteratorColumnBlocks{}.clearForResetBoundary)
321309
const _ uintptr = clearLen - clearLenColBlocks
322310
const _ uintptr = clearLenColBlocks - clearLen
@@ -451,6 +439,11 @@ func (i *singleLevelIterator[I, PI, P, PD]) SetContext(ctx context.Context) {
451439
// unpositioned. If unsuccessful, it sets i.err to any error encountered, which
452440
// may be nil if we have simply exhausted the entire table.
453441
func (i *singleLevelIterator[I, PI, P, PD]) loadDataBlock(dir int8) loadBlockResult {
442+
if !i.ensureIndexLoaded() {
443+
// Ensure the data block iterator is invalidated
444+
PD(&i.data).Invalidate()
445+
return loadBlockFailed
446+
}
454447
if !PI(&i.index).Valid() {
455448
// Ensure the data block iterator is invalidated even if loading of the block
456449
// fails.
@@ -681,6 +674,9 @@ func (i *singleLevelIterator[I, PI, D, PD]) SeekGE(
681674
func (i *singleLevelIterator[I, PI, D, PD]) seekGEHelper(
682675
key []byte, boundsCmp int, flags base.SeekGEFlags,
683676
) *base.InternalKV {
677+
if !i.ensureIndexLoaded() {
678+
return nil
679+
}
684680
// Invariant: trySeekUsingNext => !i.data.isDataInvalidated() && i.exhaustedBounds != +1
685681

686682
// SeekGE performs various step-instead-of-seeking optimizations: eg enabled
@@ -904,6 +900,10 @@ func (i *singleLevelIterator[I, PI, D, PD]) virtualLast() *base.InternalKV {
904900
// uses of this method in the future. Does a SeekLE on the upper bound of the
905901
// file/iterator.
906902
func (i *singleLevelIterator[I, PI, D, PD]) virtualLastSeekLE() *base.InternalKV {
903+
if !i.ensureIndexLoaded() {
904+
return nil
905+
}
906+
907907
// Callers of SeekLE don't know about virtual sstable bounds, so we may
908908
// have to internally restrict the bounds.
909909
//
@@ -1013,6 +1013,10 @@ func (i *singleLevelIterator[I, PI, D, PD]) SeekLT(
10131013
// Seek optimization only applies until iterator is first positioned after SetBounds.
10141014
i.boundsCmp = 0
10151015

1016+
if !i.ensureIndexLoaded() {
1017+
return nil
1018+
}
1019+
10161020
// Seeking operations perform various step-instead-of-seeking optimizations:
10171021
// eg by considering monotonically increasing bounds (i.boundsCmp).
10181022

@@ -1120,6 +1124,10 @@ func (i *singleLevelIterator[I, PI, D, PD]) firstInternal() *base.InternalKV {
11201124
// Seek optimization only applies until iterator is first positioned after SetBounds.
11211125
i.boundsCmp = 0
11221126

1127+
if !i.ensureIndexLoaded() {
1128+
return nil
1129+
}
1130+
11231131
if !PI(&i.index).First() {
11241132
PD(&i.data).Invalidate()
11251133
return nil
@@ -1184,6 +1192,10 @@ func (i *singleLevelIterator[I, PI, D, PD]) lastInternal() *base.InternalKV {
11841192
// Seek optimization only applies until iterator is first positioned after SetBounds.
11851193
i.boundsCmp = 0
11861194

1195+
if !i.ensureIndexLoaded() {
1196+
return nil
1197+
}
1198+
11871199
if !PI(&i.index).Last() {
11881200
PD(&i.data).Invalidate()
11891201
return nil
@@ -1272,6 +1284,9 @@ func (i *singleLevelIterator[I, PI, D, PD]) NextPrefix(succKey []byte) *base.Int
12721284
// Did not find prefix in the existing data block. This is the slow-path
12731285
// where we effectively seek the iterator.
12741286
// The key is likely to be in the next data block, so try one step.
1287+
if !i.ensureIndexLoaded() {
1288+
return nil
1289+
}
12751290
if !PI(&i.index).Next() {
12761291
// The target key is greater than any key in the index block.
12771292
// Invalidate the block iterator so that a subsequent call to Prev()
@@ -1343,6 +1358,9 @@ func (i *singleLevelIterator[I, PI, D, PD]) Prev() *base.InternalKV {
13431358

13441359
func (i *singleLevelIterator[I, PI, D, PD]) skipForward() *base.InternalKV {
13451360
for {
1361+
if !i.ensureIndexLoaded() {
1362+
return nil
1363+
}
13461364
if !PI(&i.index).Next() {
13471365
PD(&i.data).Invalidate()
13481366
break
@@ -1421,6 +1439,9 @@ func (i *singleLevelIterator[I, PI, D, PD]) skipForward() *base.InternalKV {
14211439

14221440
func (i *singleLevelIterator[I, PI, D, PD]) skipBackward() *base.InternalKV {
14231441
for {
1442+
if !i.ensureIndexLoaded() {
1443+
return nil
1444+
}
14241445
if !PI(&i.index).Prev() {
14251446
PD(&i.data).Invalidate()
14261447
break
@@ -1545,3 +1566,25 @@ func (i *singleLevelIterator[I, PI, D, PD]) String() string {
15451566
func (i *singleLevelIterator[I, PI, D, PD]) DebugTree(tp treeprinter.Node) {
15461567
tp.Childf("%T(%p) fileNum=%s", i, i, i.String())
15471568
}
1569+
1570+
func (i *singleLevelIterator[I, PI, D, PD]) ensureIndexLoaded() bool {
1571+
if i.indexLoaded {
1572+
return true
1573+
}
1574+
1575+
// Perform the deferred index loading calls
1576+
indexH, err := i.reader.readTopLevelIndexBlock(i.ctx, i.readEnv.Block, i.indexFilterRH)
1577+
if err != nil {
1578+
i.err = err
1579+
return false
1580+
}
1581+
1582+
err = PI(&i.index).InitHandle(i.reader.Comparer, indexH, i.transforms)
1583+
if err != nil {
1584+
i.err = err
1585+
return false
1586+
}
1587+
1588+
i.indexLoaded = true
1589+
return true
1590+
}

0 commit comments

Comments
 (0)