Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ var DefaultIteratorOptions = IteratorOptions{

// Iterator helps iterating over the KV pairs in a lexicographically sorted order.
type Iterator struct {
iitr *y.MergeIterator
iitr y.Iterator
txn *Txn
readTs uint64

Expand Down Expand Up @@ -466,9 +466,10 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
iters = append(iters, tables[i].NewUniIterator(opt.Reverse))
}
iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.

res := &Iterator{
txn: txn,
iitr: y.NewMergeIterator(iters, opt.Reverse),
iitr: table.NewMergeIterator(iters, opt.Reverse),
opt: opt,
readTs: txn.readTs,
}
Expand Down
2 changes: 1 addition & 1 deletion levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (s *levelsController) compactBuildTables(
valid = append(valid, table)
}
iters = append(iters, table.NewConcatIterator(valid, false))
it := y.NewMergeIterator(iters, false)
it := table.NewMergeIterator(iters, false)
defer it.Close() // Important to close the iterator to do ref counting.

it.Rewind()
Expand Down
213 changes: 213 additions & 0 deletions table/merge_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Copyright 2019 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package table

import (
"github.com/dgraph-io/badger/y"
"github.com/pkg/errors"
)

// MergeIterator merges multiple iterators.
// NOTE: MergeIterator owns the array of iterators and is responsible for closing them.
type MergeIterator struct {
left node
right node
small *node
reverse bool
}

type node struct {
valid bool
key []byte
iter y.Iterator

// The two iterators are type asserted from `y.Iterator`, used to inline more function calls.
// Calling functions on concrete types is much faster (about 25-30%) than calling the
// interface's function.
merge *MergeIterator
concat *ConcatIterator
}

func (n *node) setIterator(iter y.Iterator) {
n.iter = iter
// It's okay if the type assertion below fails and n.merge/n.concat are set to nil.
// We handle the nil values of merge and concat in all the methods.
n.merge, _ = iter.(*MergeIterator)
n.concat, _ = iter.(*ConcatIterator)
}

func (n *node) setKey() {
if n.merge != nil {
n.valid = n.merge.small.valid
if n.valid {
n.key = n.merge.small.key
}
} else if n.concat != nil {
n.valid = n.concat.Valid()
if n.valid {
n.key = n.concat.Key()
}
} else {
n.valid = n.iter.Valid()
if n.valid {
n.key = n.iter.Key()
}
}
}

func (n *node) next() {
if n.merge != nil {
n.merge.Next()
} else if n.concat != nil {
n.concat.Next()
} else {
n.iter.Next()
}
n.setKey()
}

func (n *node) rewind() {
n.iter.Rewind()
n.setKey()
}

func (n *node) seek(key []byte) {
n.iter.Seek(key)
n.setKey()
}

func (mi *MergeIterator) fix() {
if !mi.bigger().valid {
return
}
if !mi.small.valid {
mi.swapSmall()
return
}
cmp := y.CompareKeys(mi.small.key, mi.bigger().key)
// Both the keys are equal.
if cmp == 0 {
// In case of same keys, move the right iterator ahead.
mi.right.next()
if &mi.right == mi.small {
mi.swapSmall()
}
return
} else if cmp < 0 { // Small is less than bigger().
if mi.reverse {
mi.swapSmall()
} else {
// we don't need to do anything. Small already points to the smallest.
}
return
} else { // bigger() is less than small.
if mi.reverse {
// Do nothing since we're iterating in reverse. Small currently points to
// the bigger key and that's okay in reverse iteration.
} else {
mi.swapSmall()
}
return
}
}

func (mi *MergeIterator) bigger() *node {
if mi.small == &mi.left {
return &mi.right
}
return &mi.left
}

func (mi *MergeIterator) swapSmall() {
if mi.small == &mi.left {
mi.small = &mi.right
return
}
if mi.small == &mi.right {
mi.small = &mi.left
return
}
}

// Next returns the next element. If it is the same as the current key, ignore it.
func (mi *MergeIterator) Next() {
mi.small.next()
mi.fix()
}

// Rewind seeks to first element (or last element for reverse iterator).
func (mi *MergeIterator) Rewind() {
mi.left.rewind()
mi.right.rewind()
mi.fix()
}

// Seek brings us to element with key >= given key.
func (mi *MergeIterator) Seek(key []byte) {
mi.left.seek(key)
mi.right.seek(key)
mi.fix()
}

// Valid returns whether the MergeIterator is at a valid element.
func (mi *MergeIterator) Valid() bool {
return mi.small.valid
}

// Key returns the key associated with the current iterator.
func (mi *MergeIterator) Key() []byte {
return mi.small.key
}

// Value returns the value associated with the iterator.
func (mi *MergeIterator) Value() y.ValueStruct {
return mi.small.iter.Value()
}

// Close implements y.Iterator.
func (mi *MergeIterator) Close() error {
err1 := mi.left.iter.Close()
err2 := mi.right.iter.Close()
if err1 != nil {
return errors.Wrap(err1, "MergeIterator")
}
return errors.Wrap(err2, "MergeIterator")
}

// NewMergeIterator creates a merge iterator.
func NewMergeIterator(iters []y.Iterator, reverse bool) y.Iterator {
if len(iters) == 0 {
return nil
} else if len(iters) == 1 {
return iters[0]
} else if len(iters) == 2 {
mi := &MergeIterator{
reverse: reverse,
}
mi.left.setIterator(iters[0])
mi.right.setIterator(iters[1])
// Assign left iterator randomly. This will be fixed when user calls rewind/seek.
mi.small = &mi.left
return mi
}
mid := len(iters) / 2
return NewMergeIterator(
[]y.Iterator{
NewMergeIterator(iters[:mid], reverse),
NewMergeIterator(iters[mid:], reverse),
}, reverse)
}
Loading