Skip to content

Commit

Permalink
Add WriteBatchAt API
Browse files Browse the repository at this point in the history
  • Loading branch information
jarifibrahim committed Jul 29, 2019
1 parent d8e1fcf commit 1bd48bb
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 18 deletions.
10 changes: 6 additions & 4 deletions badger/cmd/bank.go
Expand Up @@ -364,7 +364,8 @@ func runTest(cmd *cobra.Command, args []string) error {
defer tmpDb.Close()
}

wb := db.NewWriteBatch()
wb, err := db.NewWriteBatch()
y.Check(err)
for i := 0; i < numAccounts; i++ {
y.Check(wb.Set(key(i), toSlice(initialBal)))
}
Expand Down Expand Up @@ -458,8 +459,8 @@ func runTest(cmd *cobra.Command, args []string) error {
err = tmpDb.DropAll()
y.Check(err)

batch := tmpDb.NewWriteBatch()

batch, err := tmpDb.NewWriteBatch()
y.Check(err)
stream := db.NewStream()
stream.Send = func(list *pb.KVList) error {
for _, kv := range list.Kv {
Expand Down Expand Up @@ -524,7 +525,8 @@ func runTest(cmd *cobra.Command, args []string) error {
accountIDS = append(accountIDS, key(i))
}
updater := func(kvs *pb.KVList) {
batch := subscribeDB.NewWriteBatch()
batch, err := subscribeDB.NewWriteBatch()
y.Check(err)
for _, kv := range kvs.GetKv() {
y.Check(batch.Set(kv.Key, kv.Value))
}
Expand Down
3 changes: 2 additions & 1 deletion badger/cmd/write_bench.go
Expand Up @@ -74,7 +74,8 @@ func writeRandom(db *badger.DB, num uint64) error {
y.Check2(rand.Read(value))

es := uint64(keySz + valSz) // entry size is keySz + valSz
batch := db.NewWriteBatch()
batch, err := db.NewWriteBatch()
y.Check(err)
for i := uint64(1); i <= num; i++ {
key := make([]byte, keySz)
y.Check2(rand.Read(key))
Expand Down
27 changes: 26 additions & 1 deletion batch.go
Expand Up @@ -20,6 +20,7 @@ import (
"sync"

"github.com/dgraph-io/badger/y"
"github.com/pkg/errors"
)

// WriteBatch holds the necessary info to perform batched writes.
Expand All @@ -29,14 +30,37 @@ type WriteBatch struct {
db *DB
throttle *y.Throttle
err error
commitTs uint64
}

// NewWriteBatch creates a new WriteBatch. This provides a way to conveniently do a lot of writes,
// batching them up as tightly as possible in a single transaction and using callbacks to avoid
// waiting for them to commit, thus achieving good performance. This API hides away the logic of
// creating and committing transactions. Due to the nature of SSI guaratees provided by Badger,
// blind writes can never encounter transaction conflicts (ErrConflict).
func (db *DB) NewWriteBatch() *WriteBatch {
func (db *DB) NewWriteBatch() (*WriteBatch, error) {
if db.opt.managedTxns {
return nil,
errors.New("cannot use NewWriteBatch in managed mode. Use NewWriteBatchAt instead")
}
return db.newWriteBatch(), nil
}

// NewWriteBatchAt is similar to NewWriteBatch but it allows user to set the commit timestamp.
// NewWriteBatchAt is supposed to be used in the managed mode.
func (db *DB) NewWriteBatchAt(ts uint64) (*WriteBatch, error) {
if !db.opt.managedTxns {
return nil, errors.New(
"cannot use NewWriteBatchAt with managedDB=false. Use NewTransaction instead")
}

wb := db.newWriteBatch()
wb.commitTs = ts
wb.txn.commitTs = ts
return wb, nil
}

func (db *DB) newWriteBatch() *WriteBatch {
return &WriteBatch{
db: db,
txn: db.newTransaction(true, true),
Expand Down Expand Up @@ -136,6 +160,7 @@ func (wb *WriteBatch) commit() error {
wb.txn.CommitWith(wb.callback)
wb.txn = wb.db.newTransaction(true, true)
wb.txn.readTs = 0 // We're not reading anything.
wb.txn.commitTs = wb.commitTs
return wb.err
}

Expand Down
51 changes: 49 additions & 2 deletions batch_test.go
Expand Up @@ -33,7 +33,54 @@ func TestWriteBatch(t *testing.T) {
}

runBadgerTest(t, nil, func(t *testing.T, db *DB) {
wb := db.NewWriteBatch()
wb, err := db.NewWriteBatch()
require.NoError(t, err)
defer wb.Cancel()

N, M := 50000, 1000
start := time.Now()

for i := 0; i < N; i++ {
require.NoError(t, wb.Set(key(i), val(i)))
}
for i := 0; i < M; i++ {
require.NoError(t, wb.Delete(key(i)))
}
require.NoError(t, wb.Flush())
t.Logf("Time taken for %d writes (w/ test options): %s\n", N+M, time.Since(start))

err = db.View(func(txn *Txn) error {
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()

i := M
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, string(key(i)), string(item.Key()))
valcopy, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, val(i), valcopy)
i++
}
require.Equal(t, N, i)
return nil
})
require.NoError(t, err)
})
}

func TestWriteBatchManagedMode(t *testing.T) {
key := func(i int) []byte {
return []byte(fmt.Sprintf("%10d", i))
}
val := func(i int) []byte {
return []byte(fmt.Sprintf("%128d", i))
}
opt := DefaultOptions("")
opt.managedTxns = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb, err := db.NewWriteBatchAt(10)
require.NoError(t, err)
defer wb.Cancel()

N, M := 50000, 1000
Expand All @@ -48,7 +95,7 @@ func TestWriteBatch(t *testing.T) {
require.NoError(t, wb.Flush())
t.Logf("Time taken for %d writes (w/ test options): %s\n", N+M, time.Since(start))

err := db.View(func(txn *Txn) error {
err = db.View(func(txn *Txn) error {
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()

Expand Down
6 changes: 4 additions & 2 deletions iterator_test.go
Expand Up @@ -74,7 +74,8 @@ func TestIteratePrefix(t *testing.T) {
val := []byte("OK")
n := 10000

batch := db.NewWriteBatch()
batch, err := db.NewWriteBatch()
require.NoError(t, err)
for i := 0; i < n; i++ {
if (i % 1000) == 0 {
t.Logf("Put i=%d\n", i)
Expand Down Expand Up @@ -191,7 +192,8 @@ func BenchmarkIteratePrefixSingleKey(b *testing.B) {
return []byte(fmt.Sprintf("%06d", i))
}

batch := db.NewWriteBatch()
batch, err := db.NewWriteBatch()
require.NoError(b, err)
for i := 0; i < N; i++ {
y.Check(batch.Set(bkey(i), val))
}
Expand Down
24 changes: 16 additions & 8 deletions managed_db_test.go
Expand Up @@ -112,7 +112,8 @@ func TestDropAll(t *testing.T) {

N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
writer, err := db.NewWriteBatch()
require.NoError(t, err)
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
Expand Down Expand Up @@ -148,7 +149,8 @@ func TestDropAllTwice(t *testing.T) {

N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
writer, err := db.NewWriteBatch()
require.NoError(t, err)
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
Expand Down Expand Up @@ -176,7 +178,8 @@ func TestDropAllWithPendingTxn(t *testing.T) {

N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
writer, err := db.NewWriteBatch()
require.NoError(t, err)
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
Expand Down Expand Up @@ -241,7 +244,8 @@ func TestDropReadOnly(t *testing.T) {
require.NoError(t, err)
N := uint64(1000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
writer, err := db.NewWriteBatch()
require.NoError(t, err)
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
Expand Down Expand Up @@ -273,7 +277,8 @@ func TestWriteAfterClose(t *testing.T) {
require.NoError(t, err)
N := uint64(1000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
writer, err := db.NewWriteBatch()
require.NoError(t, err)
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
Expand Down Expand Up @@ -364,7 +369,8 @@ func TestDropPrefix(t *testing.T) {

N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
writer, err := db.NewWriteBatch()
require.NoError(t, err)
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
Expand Down Expand Up @@ -415,7 +421,8 @@ func TestDropPrefixWithPendingTxn(t *testing.T) {

N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
writer, err := db.NewWriteBatch()
require.NoError(t, err)
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
Expand Down Expand Up @@ -483,7 +490,8 @@ func TestDropPrefixReadOnly(t *testing.T) {
require.NoError(t, err)
N := uint64(1000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
writer, err := db.NewWriteBatch()
require.NoError(t, err)
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
Expand Down

0 comments on commit 1bd48bb

Please sign in to comment.