Skip to content

Commit 8026c97

Browse files
committed
db: consolidate ScanInternal code into scan_internal.go
Move code specific to ScanInternal into scan_internal.go.
1 parent 55d1733 commit 8026c97

File tree

2 files changed

+153
-152
lines changed

2 files changed

+153
-152
lines changed

db.go

Lines changed: 0 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,158 +1149,6 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
11491149
return dbi
11501150
}
11511151

1152-
// ScanInternal scans all internal keys within the specified bounds, truncating
1153-
// any rangedels and rangekeys to those bounds if they span past them. For use
1154-
// when an external user needs to be aware of all internal keys that make up a
1155-
// key range.
1156-
//
1157-
// Keys deleted by range deletions must not be returned or exposed by this
1158-
// method, while the range deletion deleting that key must be exposed using
1159-
// visitRangeDel. Keys that would be masked by range key masking (if an
1160-
// appropriate prefix were set) should be exposed, alongside the range key
1161-
// that would have masked it. This method also collapses all point keys into
1162-
// one InternalKey; so only one internal key at most per user key is returned
1163-
// to visitPointKey.
1164-
//
1165-
// If visitSharedFile is not nil, ScanInternal iterates in skip-shared iteration
1166-
// mode. In this iteration mode, sstables in levels L5 and L6 are skipped, and
1167-
// their metadatas truncated to [lower, upper) and passed into visitSharedFile.
1168-
// ErrInvalidSkipSharedIteration is returned if visitSharedFile is not nil and an
1169-
// sstable in L5 or L6 is found that is not in shared storage according to
1170-
// provider.IsShared, or an sstable in those levels contains a newer key than the
1171-
// snapshot sequence number (only applicable for snapshot.ScanInternal). Examples
1172-
// of when this could happen could be if Pebble started writing sstables before a
1173-
// creator ID was set (as creator IDs are necessary to enable shared storage)
1174-
// resulting in some lower level SSTs being on non-shared storage. Skip-shared
1175-
// iteration is invalid in those cases.
1176-
func (d *DB) ScanInternal(ctx context.Context, opts ScanInternalOptions) error {
1177-
iter, err := d.newInternalIter(ctx, snapshotIterOpts{} /* snapshot */, &opts)
1178-
if err != nil {
1179-
return err
1180-
}
1181-
defer iter.close()
1182-
return scanInternalImpl(ctx, iter, &opts)
1183-
}
1184-
1185-
// newInternalIter constructs and returns a new scanInternalIterator on this db.
1186-
// If o.skipSharedLevels is true, levels below sharedLevelsStart are *not* added
1187-
// to the internal iterator.
1188-
//
1189-
// TODO(bilal): This method has a lot of similarities with db.newIter as well as
1190-
// finishInitializingIter. Both pairs of methods should be refactored to reduce
1191-
// this duplication.
1192-
func (d *DB) newInternalIter(
1193-
ctx context.Context, sOpts snapshotIterOpts, o *ScanInternalOptions,
1194-
) (*scanInternalIterator, error) {
1195-
if err := d.closed.Load(); err != nil {
1196-
panic(err)
1197-
}
1198-
// Grab and reference the current readState. This prevents the underlying
1199-
// files in the associated version from being deleted if there is a current
1200-
// compaction. The readState is unref'd by Iterator.Close().
1201-
var readState *readState
1202-
var vers *manifest.Version
1203-
if sOpts.vers == nil {
1204-
if sOpts.readState != nil {
1205-
readState = sOpts.readState
1206-
readState.ref()
1207-
vers = readState.current
1208-
} else {
1209-
readState = d.loadReadState()
1210-
vers = readState.current
1211-
}
1212-
} else {
1213-
vers = sOpts.vers
1214-
sOpts.vers.Ref()
1215-
}
1216-
1217-
// Determine the seqnum to read at after grabbing the read state (current and
1218-
// memtables) above.
1219-
seqNum := sOpts.seqNum
1220-
if seqNum == 0 {
1221-
seqNum = d.mu.versions.visibleSeqNum.Load()
1222-
}
1223-
1224-
// Bundle various structures under a single umbrella in order to allocate
1225-
// them together.
1226-
buf := iterAllocPool.Get().(*iterAlloc)
1227-
dbi := &scanInternalIterator{
1228-
ctx: ctx,
1229-
db: d,
1230-
comparer: d.opts.Comparer,
1231-
merge: d.opts.Merger.Merge,
1232-
readState: readState,
1233-
version: sOpts.vers,
1234-
alloc: buf,
1235-
newIters: d.newIters,
1236-
newIterRangeKey: d.tableNewRangeKeyIter,
1237-
seqNum: seqNum,
1238-
mergingIter: &buf.merging,
1239-
}
1240-
dbi.blobValueFetcher.Init(&vers.BlobFiles, d.fileCache, block.ReadEnv{})
1241-
1242-
dbi.opts = *o
1243-
dbi.opts.logger = d.opts.Logger
1244-
if d.opts.private.disableLazyCombinedIteration {
1245-
dbi.opts.disableLazyCombinedIteration = true
1246-
}
1247-
return finishInitializingInternalIter(buf, dbi)
1248-
}
1249-
1250-
type internalIterOpts struct {
1251-
// if compaction is set, sstable-level iterators will be created using
1252-
// NewCompactionIter; these iterators have a more constrained interface
1253-
// and are optimized for the sequential scan of a compaction.
1254-
compaction bool
1255-
readEnv sstable.ReadEnv
1256-
boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter
1257-
// blobValueFetcher is the base.ValueFetcher to use when constructing
1258-
// internal values to represent values stored externally in blob files.
1259-
blobValueFetcher base.ValueFetcher
1260-
}
1261-
1262-
func finishInitializingInternalIter(
1263-
buf *iterAlloc, i *scanInternalIterator,
1264-
) (*scanInternalIterator, error) {
1265-
// Short-hand.
1266-
var memtables flushableList
1267-
if i.readState != nil {
1268-
memtables = i.readState.memtables
1269-
}
1270-
// We only need to read from memtables which contain sequence numbers older
1271-
// than seqNum. Trim off newer memtables.
1272-
for j := len(memtables) - 1; j >= 0; j-- {
1273-
if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum {
1274-
break
1275-
}
1276-
memtables = memtables[:j]
1277-
}
1278-
i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)
1279-
1280-
if err := i.constructPointIter(i.opts.Category, memtables, buf); err != nil {
1281-
return nil, err
1282-
}
1283-
1284-
// For internal iterators, we skip the lazy combined iteration optimization
1285-
// entirely, and create the range key iterator stack directly.
1286-
i.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
1287-
if err := i.constructRangeKeyIter(); err != nil {
1288-
return nil, err
1289-
}
1290-
1291-
// Wrap the point iterator (currently i.iter) with an interleaving
1292-
// iterator that interleaves range keys pulled from
1293-
// i.rangeKey.rangeKeyIter.
1294-
i.rangeKey.iiter.Init(i.comparer, i.iter, i.rangeKey.rangeKeyIter,
1295-
keyspan.InterleavingIterOpts{
1296-
LowerBound: i.opts.LowerBound,
1297-
UpperBound: i.opts.UpperBound,
1298-
})
1299-
i.iter = &i.rangeKey.iiter
1300-
1301-
return i, nil
1302-
}
1303-
13041152
func (i *Iterator) constructPointIter(
13051153
ctx context.Context, memtables flushableList, buf *iterAlloc,
13061154
) {

scan_internal.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cockroachdb/pebble/internal/treeprinter"
1919
"github.com/cockroachdb/pebble/objstorage"
2020
"github.com/cockroachdb/pebble/objstorage/remote"
21+
"github.com/cockroachdb/pebble/sstable"
2122
"github.com/cockroachdb/pebble/sstable/blob"
2223
"github.com/cockroachdb/pebble/sstable/block"
2324
)
@@ -105,6 +106,158 @@ func (s *SharedSSTMeta) cloneFromFileMeta(f *manifest.TableMetadata) {
105106
}
106107
}
107108

109+
// ScanInternal scans all internal keys within the specified bounds, truncating
110+
// any rangedels and rangekeys to those bounds if they span past them. For use
111+
// when an external user needs to be aware of all internal keys that make up a
112+
// key range.
113+
//
114+
// Keys deleted by range deletions must not be returned or exposed by this
115+
// method, while the range deletion deleting that key must be exposed using
116+
// visitRangeDel. Keys that would be masked by range key masking (if an
117+
// appropriate prefix were set) should be exposed, alongside the range key
118+
// that would have masked it. This method also collapses all point keys into
119+
// one InternalKey; so only one internal key at most per user key is returned
120+
// to visitPointKey.
121+
//
122+
// If visitSharedFile is not nil, ScanInternal iterates in skip-shared iteration
123+
// mode. In this iteration mode, sstables in levels L5 and L6 are skipped, and
124+
// their metadatas truncated to [lower, upper) and passed into visitSharedFile.
125+
// ErrInvalidSkipSharedIteration is returned if visitSharedFile is not nil and an
126+
// sstable in L5 or L6 is found that is not in shared storage according to
127+
// provider.IsShared, or an sstable in those levels contains a newer key than the
128+
// snapshot sequence number (only applicable for snapshot.ScanInternal). Examples
129+
// of when this could happen could be if Pebble started writing sstables before a
130+
// creator ID was set (as creator IDs are necessary to enable shared storage)
131+
// resulting in some lower level SSTs being on non-shared storage. Skip-shared
132+
// iteration is invalid in those cases.
133+
func (d *DB) ScanInternal(ctx context.Context, opts ScanInternalOptions) error {
134+
iter, err := d.newInternalIter(ctx, snapshotIterOpts{} /* snapshot */, &opts)
135+
if err != nil {
136+
return err
137+
}
138+
defer iter.close()
139+
return scanInternalImpl(ctx, iter, &opts)
140+
}
141+
142+
// newInternalIter constructs and returns a new scanInternalIterator on this db.
143+
// If o.skipSharedLevels is true, levels below sharedLevelsStart are *not* added
144+
// to the internal iterator.
145+
//
146+
// TODO(bilal): This method has a lot of similarities with db.newIter as well as
147+
// finishInitializingIter. Both pairs of methods should be refactored to reduce
148+
// this duplication.
149+
func (d *DB) newInternalIter(
150+
ctx context.Context, sOpts snapshotIterOpts, o *ScanInternalOptions,
151+
) (*scanInternalIterator, error) {
152+
if err := d.closed.Load(); err != nil {
153+
panic(err)
154+
}
155+
// Grab and reference the current readState. This prevents the underlying
156+
// files in the associated version from being deleted if there is a current
157+
// compaction. The readState is unref'd by Iterator.Close().
158+
var readState *readState
159+
var vers *manifest.Version
160+
if sOpts.vers == nil {
161+
if sOpts.readState != nil {
162+
readState = sOpts.readState
163+
readState.ref()
164+
vers = readState.current
165+
} else {
166+
readState = d.loadReadState()
167+
vers = readState.current
168+
}
169+
} else {
170+
vers = sOpts.vers
171+
sOpts.vers.Ref()
172+
}
173+
174+
// Determine the seqnum to read at after grabbing the read state (current and
175+
// memtables) above.
176+
seqNum := sOpts.seqNum
177+
if seqNum == 0 {
178+
seqNum = d.mu.versions.visibleSeqNum.Load()
179+
}
180+
181+
// Bundle various structures under a single umbrella in order to allocate
182+
// them together.
183+
buf := iterAllocPool.Get().(*iterAlloc)
184+
dbi := &scanInternalIterator{
185+
ctx: ctx,
186+
db: d,
187+
comparer: d.opts.Comparer,
188+
merge: d.opts.Merger.Merge,
189+
readState: readState,
190+
version: sOpts.vers,
191+
alloc: buf,
192+
newIters: d.newIters,
193+
newIterRangeKey: d.tableNewRangeKeyIter,
194+
seqNum: seqNum,
195+
mergingIter: &buf.merging,
196+
}
197+
dbi.blobValueFetcher.Init(&vers.BlobFiles, d.fileCache, block.ReadEnv{})
198+
199+
dbi.opts = *o
200+
dbi.opts.logger = d.opts.Logger
201+
if d.opts.private.disableLazyCombinedIteration {
202+
dbi.opts.disableLazyCombinedIteration = true
203+
}
204+
return finishInitializingInternalIter(buf, dbi)
205+
}
206+
207+
type internalIterOpts struct {
208+
// if compaction is set, sstable-level iterators will be created using
209+
// NewCompactionIter; these iterators have a more constrained interface
210+
// and are optimized for the sequential scan of a compaction.
211+
compaction bool
212+
readEnv sstable.ReadEnv
213+
boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter
214+
// blobValueFetcher is the base.ValueFetcher to use when constructing
215+
// internal values to represent values stored externally in blob files.
216+
blobValueFetcher base.ValueFetcher
217+
}
218+
219+
func finishInitializingInternalIter(
220+
buf *iterAlloc, i *scanInternalIterator,
221+
) (*scanInternalIterator, error) {
222+
// Short-hand.
223+
var memtables flushableList
224+
if i.readState != nil {
225+
memtables = i.readState.memtables
226+
}
227+
// We only need to read from memtables which contain sequence numbers older
228+
// than seqNum. Trim off newer memtables.
229+
for j := len(memtables) - 1; j >= 0; j-- {
230+
if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum {
231+
break
232+
}
233+
memtables = memtables[:j]
234+
}
235+
i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)
236+
237+
if err := i.constructPointIter(i.opts.Category, memtables, buf); err != nil {
238+
return nil, err
239+
}
240+
241+
// For internal iterators, we skip the lazy combined iteration optimization
242+
// entirely, and create the range key iterator stack directly.
243+
i.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
244+
if err := i.constructRangeKeyIter(); err != nil {
245+
return nil, err
246+
}
247+
248+
// Wrap the point iterator (currently i.iter) with an interleaving
249+
// iterator that interleaves range keys pulled from
250+
// i.rangeKey.rangeKeyIter.
251+
i.rangeKey.iiter.Init(i.comparer, i.iter, i.rangeKey.rangeKeyIter,
252+
keyspan.InterleavingIterOpts{
253+
LowerBound: i.opts.LowerBound,
254+
UpperBound: i.opts.UpperBound,
255+
})
256+
i.iter = &i.rangeKey.iiter
257+
258+
return i, nil
259+
}
260+
108261
type sharedByLevel []SharedSSTMeta
109262

110263
func (s sharedByLevel) Len() int { return len(s) }

0 commit comments

Comments
 (0)