Skip to content

Commit

Permalink
Allow multiple iterators to run concurrently for the same RO txn. (#591)
Browse files Browse the repository at this point in the history
- Support multiple read-only iterators, as per request from blevesearch/bleve#591.

- Disallow the public APIs from accepting keys prefixed with `!badger!`, fixing #582.

- Disallow closing iterator twice, fixing #588.
  • Loading branch information
manishrjain committed Sep 25, 2018
1 parent c10276c commit 41d9656
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 18 deletions.
98 changes: 98 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func runBadgerTest(t *testing.T, opts *Options, test func(t *testing.T, db *DB))
if opts == nil {
opts = new(Options)
*opts = getTestOptions(dir)
} else {
opts.Dir = dir
opts.ValueDir = dir
}
db, err := Open(*opts)
require.NoError(t, err)
Expand Down Expand Up @@ -656,6 +659,73 @@ func TestIterateDeleted(t *testing.T) {
})
}

func TestIterateParallel(t *testing.T) {
key := func(account int) []byte {
var b [4]byte
binary.BigEndian.PutUint32(b[:], uint32(account))
return append([]byte("account-"), b[:]...)
}

N := 100000
iterate := func(txn *Txn, wg *sync.WaitGroup) {
defer wg.Done()
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()

var count int
for itr.Rewind(); itr.Valid(); itr.Next() {
count++
item := itr.Item()
require.Equal(t, "account-", string(item.Key()[0:8]))
err := item.Value(func(val []byte) {
require.Equal(t, "1000", string(val))
})
require.NoError(t, err)
}
require.Equal(t, N, count)
itr.Close() // Double close.
}

opt := DefaultOptions
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
var wg sync.WaitGroup
var txns []*Txn
for i := 0; i < N; i++ {
wg.Add(1)
txn := db.NewTransaction(true)
require.NoError(t, txn.Set(key(i), []byte("1000")))
txns = append(txns, txn)
}
for _, txn := range txns {
y.Check(txn.Commit(func(err error) {
y.Check(err)
wg.Done()
}))
}

wg.Wait()

// Check that a RW txn can't run multiple iterators.
txn := db.NewTransaction(true)
itr := txn.NewIterator(DefaultIteratorOptions)
require.Panics(t, func() {
txn.NewIterator(DefaultIteratorOptions)
})
require.Panics(t, txn.Discard)
itr.Close()
txn.Discard()

// Run multiple iterators for a RO txn.
txn = db.NewTransaction(false)
defer txn.Discard()
wg.Add(3)
go iterate(txn, &wg)
go iterate(txn, &wg)
go iterate(txn, &wg)
wg.Wait()
})
}

func TestDeleteWithoutSyncWrite(t *testing.T) {
dir, err := ioutil.TempDir("", "badger")
require.NoError(t, err)
Expand Down Expand Up @@ -699,6 +769,34 @@ func TestPidFile(t *testing.T) {
})
}

func TestInvalidKey(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
err := db.Update(func(txn *Txn) error {
err := txn.Set([]byte("!badger!head"), nil)
require.Equal(t, ErrInvalidKey, err)

err = txn.Set([]byte("!badger!"), nil)
require.Equal(t, ErrInvalidKey, err)

err = txn.Set([]byte("!badger"), []byte("BadgerDB"))
require.NoError(t, err)
return err
})
require.NoError(t, err)

require.NoError(t, db.View(func(txn *Txn) error {
item, err := txn.Get([]byte("!badger"))
if err != nil {
return err
}
require.NoError(t, item.Value(func(val []byte) {
require.Equal(t, []byte("BadgerDB"), val)
}))
return nil
}))
})
}

func TestBigKeyValuePairs(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
bigK := make([]byte, maxKeySize+1)
Expand Down
4 changes: 4 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ var (
// ErrEmptyKey is returned if an empty key is passed on an update function.
ErrEmptyKey = errors.New("Key cannot be empty")

// ErrInvalidKey is returned if the key has a special !badger! prefix,
// reserved for internal usage.
ErrInvalidKey = errors.New("Key is using a reserved !badger! prefix")

// ErrRetry is returned when a log file containing the value is not found.
// This usually indicates that it may have been garbage collected, and the
// operation needs to be retried.
Expand Down
30 changes: 20 additions & 10 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/dgraph-io/badger/options"

"github.com/dgraph-io/badger/y"
farm "github.com/dgryski/go-farm"
)

type prefetchStatus uint8
Expand Down Expand Up @@ -323,15 +322,25 @@ type Iterator struct {
waste list

lastKey []byte // Used to skip over multiple versions of the same key.

closed bool
}

// NewIterator returns a new iterator. Depending upon the options, either only keys, or both
// key-value pairs would be fetched. The keys are returned in lexicographically sorted order.
// Using prefetch is highly recommended if you're doing a long running iteration.
// Avoid long running iterations in update transactions.
// Using prefetch is recommended if you're doing a long running iteration, for performance.
//
// Multiple Iterators:
// For a read-only txn, multiple iterators can be running simultaneously. However, for a read-write
// txn, only one can be running at one time to avoid race conditions, because Txn is thread-unsafe.
func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
if atomic.AddInt32(&txn.numIterators, 1) > 1 {
panic("Only one iterator can be active at one time.")
if txn.discarded {
panic("Transaction has already been discarded")
}
// Do not change the order of the next if. We must track the number of running iterators.
if atomic.AddInt32(&txn.numIterators, 1) > 1 && txn.update {
atomic.AddInt32(&txn.numIterators, -1)
panic("Only one iterator can be active at one time, for a RW txn.")
}

tables, decr := txn.db.getMemTables()
Expand Down Expand Up @@ -366,10 +375,7 @@ func (it *Iterator) newItem() *Item {
// This item is only valid until it.Next() gets called.
func (it *Iterator) Item() *Item {
tx := it.txn
if tx.update {
// Track reads if this is an update txn.
tx.reads = append(tx.reads, farm.Fingerprint64(it.item.Key()))
}
tx.addReadKey(it.item.Key())
return it.item
}

Expand All @@ -384,8 +390,12 @@ func (it *Iterator) ValidForPrefix(prefix []byte) bool {

// Close would close the iterator. It is important to call this when you're done with iteration.
func (it *Iterator) Close() {
it.iitr.Close()
if it.closed {
return
}
it.closed = true

it.iitr.Close()
// It is important to wait for the fill goroutines to finish. Otherwise, we might leave zombie
// goroutines behind, which are waiting to acquire file read locks after DB has been closed.
waitFor := func(l list) {
Expand Down
25 changes: 17 additions & 8 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,21 +350,24 @@ func (txn *Txn) SetWithTTL(key, val []byte, dur time.Duration) error {
}

func (txn *Txn) modify(e *Entry) error {
if !txn.update {
switch {
case !txn.update:
return ErrReadOnlyTxn
} else if txn.discarded {
case txn.discarded:
return ErrDiscardedTxn
} else if len(e.Key) == 0 {
case len(e.Key) == 0:
return ErrEmptyKey
} else if len(e.Key) > maxKeySize {
case bytes.HasPrefix(e.Key, badgerPrefix):
return ErrInvalidKey
case len(e.Key) > maxKeySize:
return exceedsMaxKeySizeError(e.Key)
} else if int64(len(e.Value)) > txn.db.opt.ValueLogFileSize {
case int64(len(e.Value)) > txn.db.opt.ValueLogFileSize:
return exceedsMaxValueSizeError(e.Value, txn.db.opt.ValueLogFileSize)
}

if err := txn.checkSize(e); err != nil {
return err
}

fp := farm.Fingerprint64(e.Key) // Avoid dealing with byte arrays.
txn.writes = append(txn.writes, fp)
txn.pendingWrites[string(e.Key)] = e
Expand Down Expand Up @@ -424,8 +427,7 @@ func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
}
// Only track reads if this is update txn. No need to track read if txn serviced it
// internally.
fp := farm.Fingerprint64(key)
txn.reads = append(txn.reads, fp)
txn.addReadKey(key)
}

seek := y.KeyWithTs(key, txn.readTs)
Expand All @@ -451,6 +453,13 @@ func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
return item, nil
}

func (txn *Txn) addReadKey(key []byte) {
if txn.update {
fp := farm.Fingerprint64(key)
txn.reads = append(txn.reads, fp)
}
}

// Discard discards a created transaction. This method is very important and must be called. Commit
// method calls this internally, however, calling this multiple times doesn't cause any issues. So,
// this can safely be called via a defer right when transaction is created.
Expand Down

0 comments on commit 41d9656

Please sign in to comment.