Skip to content

Commit

Permalink
Add panics if an unclosed iterator is found. Or, if multiple iterator…
Browse files Browse the repository at this point in the history
…s are run in one transaction.
  • Loading branch information
manishrjain committed Jun 19, 2018
1 parent 3340933 commit b1ad1e9
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 3 deletions.
2 changes: 2 additions & 0 deletions backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) {
opts := DefaultIteratorOptions
opts.AllVersions = true
it := txn.NewIterator(opts)
defer it.Close()

for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
if item.Version() < since {
Expand Down
1 change: 1 addition & 0 deletions backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestDumpLoad(t *testing.T) {
opts := DefaultIteratorOptions
opts.AllVersions = true
it := txn.NewIterator(opts)
defer it.Close()
var count int
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
Expand Down
2 changes: 2 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,8 @@ func (op *MergeOperator) iterateAndMerge(txn *Txn) (val []byte, err error) {
opt := DefaultIteratorOptions
opt.AllVersions = true
it := txn.NewIterator(opt)
defer it.Close()

var numVersions int
for it.Rewind(); it.ValidForPrefix(op.key); it.Next() {
item := it.Item()
Expand Down
5 changes: 5 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ func TestDiscardVersionsBelow(t *testing.T) {
// Verify that there are 4 versions, and record 3rd version (2nd from top in iteration)
db.View(func(txn *Txn) error {
it := txn.NewIterator(opts)
defer it.Close()
var count int
for it.Rewind(); it.Valid(); it.Next() {
count++
Expand All @@ -885,6 +886,7 @@ func TestDiscardVersionsBelow(t *testing.T) {
// below ts have been deleted.
db.View(func(txn *Txn) error {
it := txn.NewIterator(opts)
defer it.Close()
var count int
for it.Rewind(); it.Valid(); it.Next() {
count++
Expand Down Expand Up @@ -931,6 +933,7 @@ func TestExpiry(t *testing.T) {
opts.PrefetchValues = false
err = db.View(func(txn *Txn) error {
it := txn.NewIterator(opts)
defer it.Close()
var count int
for it.Rewind(); it.Valid(); it.Next() {
count++
Expand Down Expand Up @@ -1099,6 +1102,7 @@ func TestWriteDeadlock(t *testing.T) {
opt := DefaultIteratorOptions
opt.PrefetchValues = false
it := txn.NewIterator(opt)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()

Expand Down Expand Up @@ -1558,6 +1562,7 @@ func ExampleTxn_NewIterator() {
var count int
err = db.View(func(txn *Txn) error {
it := txn.NewIterator(opt)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
count++
}
Expand Down
6 changes: 6 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/badger/options"
Expand Down Expand Up @@ -316,6 +317,10 @@ type Iterator struct {
// Using prefetch is highly recommended if you're doing a long running iteration.
// Avoid long running iterations in update transactions.
func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
if atomic.AddInt32(&txn.numIterators, 1) > 1 {
panic("Only one iterator can be active at one time.")
}

tables, decr := txn.db.getMemTables()
defer decr()
txn.db.vlog.incrIteratorCount()
Expand Down Expand Up @@ -382,6 +387,7 @@ func (it *Iterator) Close() {

// TODO: We could handle this error.
_ = it.txn.db.vlog.decrIteratorCount()
atomic.AddInt32(&it.txn.numIterators, -1)
}

// Next would advance the iterator by one. Always check it.Valid() after a Next()
Expand Down
9 changes: 6 additions & 3 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ type Txn struct {
callbacks []func()
discarded bool

size int64
count int64
size int64
count int64
numIterators int32
}

type pendingWritesIterator struct {
Expand Down Expand Up @@ -423,10 +424,12 @@ func (txn *Txn) Discard() {
if txn.discarded { // Avoid a re-run.
return
}
if atomic.LoadInt32(&txn.numIterators) > 0 {
panic("Unclosed iterator at time of Txn.Discard.")
}
txn.discarded = true
txn.db.orc.readMark.Done(txn.readTs)
txn.runCallbacks()

if txn.update {
txn.db.orc.decrRef()
}
Expand Down
13 changes: 13 additions & 0 deletions transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func TestTxnVersions(t *testing.T) {
}

checkIterator := func(itr *Iterator, i int) {
defer itr.Close()
count := 0
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
Expand Down Expand Up @@ -225,12 +226,14 @@ func TestTxnVersions(t *testing.T) {
opt.AllVersions = true
itr = txn.NewIterator(opt)
checkAllVersions(itr, i)
itr.Close()

opt = DefaultIteratorOptions
opt.AllVersions = true
opt.Reverse = true
itr = txn.NewIterator(opt)
checkAllVersions(itr, i)
itr.Close()

txn.Discard()
}
Expand Down Expand Up @@ -352,6 +355,7 @@ func TestTxnIterationEdgeCase(t *testing.T) {
require.Equal(t, uint64(4), db.orc.readTs())

checkIterator := func(itr *Iterator, expected []string) {
defer itr.Close()
var i int
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
Expand Down Expand Up @@ -435,6 +439,7 @@ func TestTxnIterationEdgeCase2(t *testing.T) {
require.Equal(t, uint64(4), db.orc.readTs())

checkIterator := func(itr *Iterator, expected []string) {
defer itr.Close()
var i int
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
Expand Down Expand Up @@ -463,6 +468,7 @@ func TestTxnIterationEdgeCase2(t *testing.T) {
itr.Seek(kc)
require.True(t, itr.Valid())
require.Equal(t, itr.item.Key(), kc)
itr.Close()

itr = txn.NewIterator(rev)
itr.Seek(ka)
Expand All @@ -471,6 +477,7 @@ func TestTxnIterationEdgeCase2(t *testing.T) {
itr.Seek(kc)
require.True(t, itr.Valid())
require.Equal(t, itr.item.Key(), kc)
itr.Close()

txn.readTs = 3
itr = txn.NewIterator(DefaultIteratorOptions)
Expand Down Expand Up @@ -537,6 +544,7 @@ func TestTxnIterationEdgeCase3(t *testing.T) {
itr.Seek([]byte("ac"))
require.True(t, itr.Valid())
require.Equal(t, itr.item.Key(), kc)
itr.Close()

// Keys: "abc", "ade"
// Read pending writes.
Expand All @@ -558,6 +566,7 @@ func TestTxnIterationEdgeCase3(t *testing.T) {
itr.Seek([]byte("ad"))
require.True(t, itr.Valid())
require.Equal(t, itr.item.Key(), kd)
itr.Close()

itr = txn.NewIterator(rev)
itr.Seek([]byte("ac"))
Expand All @@ -576,6 +585,7 @@ func TestTxnIterationEdgeCase3(t *testing.T) {
itr.Seek([]byte("ad"))
require.True(t, itr.Valid())
require.Equal(t, itr.item.Key(), kc)
itr.Close()

// Keys: "abc", "ade"
itr = txn2.NewIterator(rev)
Expand All @@ -595,6 +605,7 @@ func TestTxnIterationEdgeCase3(t *testing.T) {
itr.Seek([]byte("ac"))
require.True(t, itr.Valid())
require.Equal(t, itr.item.Key(), kb)
itr.Close()
})
}

Expand Down Expand Up @@ -630,6 +641,7 @@ func TestIteratorAllVersionsWithDeleted(t *testing.T) {
// Verify that deleted shows up when AllVersions is set.
err = db.View(func(txn *Txn) error {
it := txn.NewIterator(opts)
defer it.Close()
var count int
for it.Rewind(); it.Valid(); it.Next() {
count++
Expand Down Expand Up @@ -670,6 +682,7 @@ func TestIteratorAllVersionsWithDeleted2(t *testing.T) {
// Verify that deleted shows up when AllVersions is set.
err := db.View(func(txn *Txn) error {
it := txn.NewIterator(opts)
defer it.Close()
var count int
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
Expand Down

0 comments on commit b1ad1e9

Please sign in to comment.