Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/keyspan: support interleaving end keys #3536

Merged
merged 2 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 63 additions & 42 deletions internal/keyspan/interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ type SpanMask interface {
// InterleavedIter does not interleave synthetic markers for spans that do not
// contain any keys.
//
// When InterleavingIterOpts.InterleaveEndKeys is set, in addition to
// interleaving start keys, the interleaving iterator will interleave end
// boundary keys (also at the maximumal sequence number). At these end boundary
// positions, Span() will return the span to which the end boundary belongs.
//
// # SpanMask
//
// InterelavingIter takes a SpanMask parameter that may be used to configure the
Expand All @@ -98,10 +103,7 @@ type InterleavingIter struct {
comparer *base.Comparer
pointIter base.InternalIterator
keyspanIter FragmentIterator
mask SpanMask

// lower and upper hold the iteration bounds set through SetBounds.
lower, upper []byte
opts InterleavingIterOpts
// keyBuf is used to copy SeekGE or SeekPrefixGE arguments when they're used
// to truncate a span. The byte slices backing a SeekGE/SeekPrefixGE search
// keys can come directly from the end user, so they're copied into keyBuf
Expand Down Expand Up @@ -193,6 +195,10 @@ var _ base.InternalIterator = &InterleavingIter{}
type InterleavingIterOpts struct {
Mask SpanMask
LowerBound, UpperBound []byte
// InterleaveEndKeys configures the interleaving iterator to interleave the
// end keys of spans (in addition to the start keys, which are always
// interleaved).
InterleaveEndKeys bool
}

// Init initializes the InterleavingIter to interleave point keys from pointIter
Expand All @@ -214,9 +220,7 @@ func (i *InterleavingIter) Init(
comparer: comparer,
pointIter: pointIter,
keyspanIter: keyspanIter,
mask: opts.Mask,
lower: opts.LowerBound,
upper: opts.UpperBound,
opts: opts,
}
}

Expand Down Expand Up @@ -261,7 +265,7 @@ func (i *InterleavingIter) InitSeekLT(key []byte, pointKV *base.InternalKV) *bas
i.savePoint(pointKV)
i.keyspanSeekLT(key)
i.computeLargestPos()
return i.yieldPosition(i.lower, i.prevPos)
return i.yieldPosition(i.opts.LowerBound, i.prevPos)
}

// SeekGE implements (base.InternalIterator).SeekGE.
Expand Down Expand Up @@ -394,7 +398,7 @@ func (i *InterleavingIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.Inte

i.dir = -1
i.computeLargestPos()
return i.yieldPosition(i.lower, i.prevPos)
return i.yieldPosition(i.opts.LowerBound, i.prevPos)
}

// First implements (base.InternalIterator).First.
Expand All @@ -407,7 +411,7 @@ func (i *InterleavingIter) First() *base.InternalKV {
i.savedKeyspan()
i.dir = +1
i.computeSmallestPos()
return i.yieldPosition(i.lower, i.nextPos)
return i.yieldPosition(i.opts.LowerBound, i.nextPos)
}

// Last implements (base.InternalIterator).Last.
Expand All @@ -420,7 +424,7 @@ func (i *InterleavingIter) Last() *base.InternalKV {
i.savedKeyspan()
i.dir = -1
i.computeLargestPos()
return i.yieldPosition(i.lower, i.prevPos)
return i.yieldPosition(i.opts.LowerBound, i.prevPos)
}

// Next implements (base.InternalIterator).Next.
Expand All @@ -429,7 +433,7 @@ func (i *InterleavingIter) Next() *base.InternalKV {
// Switching directions.
i.dir = +1

if i.mask != nil {
if i.opts.Mask != nil {
// Clear the mask while we reposition the point iterator. While
// switching directions, we may move the point iterator outside of
// i.span's bounds.
Expand Down Expand Up @@ -471,7 +475,7 @@ func (i *InterleavingIter) Next() *base.InternalKV {
// Fallthrough to calling i.nextPos.
}
i.nextPos()
return i.yieldPosition(i.lower, i.nextPos)
return i.yieldPosition(i.opts.LowerBound, i.nextPos)
}

// NextPrefix implements (base.InternalIterator).NextPrefix.
Expand All @@ -497,7 +501,7 @@ func (i *InterleavingIter) NextPrefix(succKey []byte) *base.InternalKV {
case posKeyspanStart, posKeyspanEnd:
i.nextPos()
}
return i.yieldPosition(i.lower, i.nextPos)
return i.yieldPosition(i.opts.LowerBound, i.nextPos)
}

// Prev implements (base.InternalIterator).Prev.
Expand All @@ -506,7 +510,7 @@ func (i *InterleavingIter) Prev() *base.InternalKV {
// Switching directions.
i.dir = -1

if i.mask != nil {
if i.opts.Mask != nil {
// Clear the mask while we reposition the point iterator. While
// switching directions, we may move the point iterator outside of
// i.span's bounds.
Expand Down Expand Up @@ -569,7 +573,7 @@ func (i *InterleavingIter) Prev() *base.InternalKV {
// Fallthrough to calling i.prevPos.
}
i.prevPos()
return i.yieldPosition(i.lower, i.prevPos)
return i.yieldPosition(i.opts.LowerBound, i.prevPos)
}

// computeSmallestPos sets i.{pos,withinSpan} to:
Expand Down Expand Up @@ -766,9 +770,9 @@ func (i *InterleavingIter) yieldPosition(lowerBound []byte, advance func()) *bas
panic("i.pointKV is nil")
}

if i.mask != nil {
if i.opts.Mask != nil {
i.maybeUpdateMask()
if i.withinSpan && i.mask.SkipPoint(i.pointKV.K.UserKey) {
if i.withinSpan && i.opts.Mask.SkipPoint(i.pointKV.K.UserKey) {
// The span covers the point key. If a SkipPoint hook is
// configured, ask it if we should skip this point key.
if i.prefix != nil {
Expand Down Expand Up @@ -797,16 +801,19 @@ func (i *InterleavingIter) yieldPosition(lowerBound []byte, advance func()) *bas
}
return i.yieldPointKey()
case posKeyspanEnd:
// Don't interleave end keys; just advance.
advance()
continue
if !i.opts.InterleaveEndKeys {
// Don't interleave end keys; just advance.
advance()
continue
}
return i.yieldSyntheticSpanEndMarker()
case posKeyspanStart:
// Don't interleave an empty span.
if i.span.Empty() {
advance()
continue
}
return i.yieldSyntheticSpanMarker(lowerBound)
return i.yieldSyntheticSpanStartMarker(lowerBound)
default:
panic(fmt.Sprintf("unexpected interleavePos=%d", i.pos))
}
Expand Down Expand Up @@ -848,7 +855,7 @@ func (i *InterleavingIter) saveSpanForward(span *Span, err error) {
return
}
// Check the upper bound if we have one.
if i.upper != nil && i.cmp(i.span.Start, i.upper) >= 0 {
if i.opts.UpperBound != nil && i.cmp(i.span.Start, i.opts.UpperBound) >= 0 {
i.span = nil
return
}
Expand All @@ -861,17 +868,17 @@ func (i *InterleavingIter) saveSpanForward(span *Span, err error) {

// NB: These truncations don't require setting `keyspanMarkerTruncated`:
// That flag only applies to truncated span marker keys.
if i.lower != nil && i.cmp(i.span.Start, i.lower) < 0 {
if i.opts.LowerBound != nil && i.cmp(i.span.Start, i.opts.LowerBound) < 0 {
i.truncated = true
i.truncatedSpan = *i.span
i.truncatedSpan.Start = i.lower
i.truncatedSpan.Start = i.opts.LowerBound
}
if i.upper != nil && i.cmp(i.upper, i.span.End) < 0 {
if i.opts.UpperBound != nil && i.cmp(i.opts.UpperBound, i.span.End) < 0 {
if !i.truncated {
i.truncated = true
i.truncatedSpan = *i.span
}
i.truncatedSpan.End = i.upper
i.truncatedSpan.End = i.opts.UpperBound
}
// If this is a part of a SeekPrefixGE call, we may also need to truncate to
// the prefix's bounds.
Expand Down Expand Up @@ -904,7 +911,7 @@ func (i *InterleavingIter) saveSpanBackward(span *Span, err error) {
}

// Check the lower bound if we have one.
if i.lower != nil && i.cmp(i.span.End, i.lower) <= 0 {
if i.opts.LowerBound != nil && i.cmp(i.span.End, i.opts.LowerBound) <= 0 {
i.span = nil
return
}
Expand All @@ -917,17 +924,17 @@ func (i *InterleavingIter) saveSpanBackward(span *Span, err error) {

// NB: These truncations don't require setting `keyspanMarkerTruncated`:
// That flag only applies to truncated span marker keys.
if i.lower != nil && i.cmp(i.span.Start, i.lower) < 0 {
if i.opts.LowerBound != nil && i.cmp(i.span.Start, i.opts.LowerBound) < 0 {
i.truncated = true
i.truncatedSpan = *i.span
i.truncatedSpan.Start = i.lower
i.truncatedSpan.Start = i.opts.LowerBound
}
if i.upper != nil && i.cmp(i.upper, i.span.End) < 0 {
if i.opts.UpperBound != nil && i.cmp(i.opts.UpperBound, i.span.End) < 0 {
if !i.truncated {
i.truncated = true
i.truncatedSpan = *i.span
}
i.truncatedSpan.End = i.upper
i.truncatedSpan.End = i.opts.UpperBound
}
if i.truncated && i.comparer.Equal(i.truncatedSpan.Start, i.truncatedSpan.End) {
i.span = nil
Expand All @@ -944,7 +951,7 @@ func (i *InterleavingIter) yieldPointKey() *base.InternalKV {
return i.verify(i.pointKV)
}

func (i *InterleavingIter) yieldSyntheticSpanMarker(lowerBound []byte) *base.InternalKV {
func (i *InterleavingIter) yieldSyntheticSpanStartMarker(lowerBound []byte) *base.InternalKV {
i.spanMarker.K.UserKey = i.startKey()
i.spanMarker.K.Trailer = base.MakeTrailer(base.InternalKeySeqNumMax, i.span.Keys[0].Kind())

Expand All @@ -958,7 +965,7 @@ func (i *InterleavingIter) yieldSyntheticSpanMarker(lowerBound []byte) *base.Int
// bound for truncating a span. The span a-z will be truncated to [k,
// z). If i.upper == k, we'd mistakenly try to return a span [k, k), an
// invariant violation.
if i.comparer.Equal(lowerBound, i.upper) {
if i.comparer.Equal(lowerBound, i.opts.UpperBound) {
return i.yieldNil()
}

Expand All @@ -981,6 +988,12 @@ func (i *InterleavingIter) yieldSyntheticSpanMarker(lowerBound []byte) *base.Int
return i.verify(&i.spanMarker)
}

func (i *InterleavingIter) yieldSyntheticSpanEndMarker() *base.InternalKV {
i.spanMarker.K.UserKey = i.endKey()
i.spanMarker.K.Trailer = base.MakeTrailer(base.InternalKeySeqNumMax, i.span.Keys[0].Kind())
return i.verify(&i.spanMarker)
}

func (i *InterleavingIter) disablePrefixMode() {
if i.prefix != nil {
i.prefix = nil
Expand All @@ -997,9 +1010,10 @@ func (i *InterleavingIter) verify(kv *base.InternalKV) *base.InternalKV {
switch {
case i.dir == -1 && i.spanMarkerTruncated:
panic("pebble: invariant violation: truncated span key in reverse iteration")
case kv != nil && i.lower != nil && i.cmp(kv.K.UserKey, i.lower) < 0:
case kv != nil && i.opts.LowerBound != nil && i.cmp(kv.K.UserKey, i.opts.LowerBound) < 0:
panic("pebble: invariant violation: key < lower bound")
case kv != nil && i.upper != nil && i.cmp(kv.K.UserKey, i.upper) >= 0:
case kv != nil && i.opts.UpperBound != nil &&
!base.UserKeyExclusive(i.opts.UpperBound).IsUpperBoundForInternalKey(i.comparer.Compare, kv.K):
panic("pebble: invariant violation: key ≥ upper bound")
case i.err != nil && kv != nil:
panic("pebble: invariant violation: accumulated error swallowed")
Expand All @@ -1019,25 +1033,25 @@ func (i *InterleavingIter) savedKeyspan() {
// hasn't been updated with the current keyspan yet.
func (i *InterleavingIter) maybeUpdateMask() {
switch {
case i.mask == nil, i.maskSpanChangedCalled:
case i.opts.Mask == nil, i.maskSpanChangedCalled:
return
case !i.withinSpan || i.span.Empty():
i.clearMask()
case i.truncated:
i.mask.SpanChanged(&i.truncatedSpan)
i.opts.Mask.SpanChanged(&i.truncatedSpan)
i.maskSpanChangedCalled = true
default:
i.mask.SpanChanged(i.span)
i.opts.Mask.SpanChanged(i.span)
i.maskSpanChangedCalled = true
}
}

// clearMask clears the current mask, if a mask is configured and no mask should
// be active.
func (i *InterleavingIter) clearMask() {
if i.mask != nil {
if i.opts.Mask != nil {
i.maskSpanChangedCalled = false
i.mask.SpanChanged(nil)
i.opts.Mask.SpanChanged(nil)
}
}

Expand All @@ -1048,6 +1062,13 @@ func (i *InterleavingIter) startKey() []byte {
return i.span.Start
}

func (i *InterleavingIter) endKey() []byte {
if i.truncated {
return i.truncatedSpan.End
}
return i.span.End
}

func (i *InterleavingIter) savePoint(kv *base.InternalKV) {
i.pointKV = kv
if kv == nil {
Expand Down Expand Up @@ -1079,7 +1100,7 @@ func (i *InterleavingIter) Span() *Span {

// SetBounds implements (base.InternalIterator).SetBounds.
func (i *InterleavingIter) SetBounds(lower, upper []byte) {
i.lower, i.upper = lower, upper
i.opts.LowerBound, i.opts.UpperBound = lower, upper
i.pointIter.SetBounds(lower, upper)
i.Invalidate()
}
Expand Down
1 change: 1 addition & 0 deletions internal/keyspan/interleaving_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func runInterleavingIterTest(t *testing.T, filename string) {
if cmdArg, ok := td.Arg("masking-threshold"); ok {
hooks.threshold = []byte(strings.Join(cmdArg.Vals, ""))
}
opts.InterleaveEndKeys = td.HasArg("interleave-end-keys")
iter.Init(testkeys.Comparer, &pointIter, keyspanIter, opts)
// Clear any previous bounds.
pointIter.SetBounds(nil, nil)
Expand Down
Loading
Loading