Skip to content

Commit

Permalink
Support WriteBatch API in managed mode (#948)
Browse files Browse the repository at this point in the history
This commit adds a new `WriteBatchAt(commitTs)` API which allows the user to use 
write batch in managed mode.

Fixes #944


(cherry picked from commit 7f43769)
  • Loading branch information
Ibrahim Jarif committed Mar 10, 2020
1 parent 473a2f5 commit 68da170
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 0 deletions.
9 changes: 9 additions & 0 deletions batch.go
Expand Up @@ -29,6 +29,7 @@ type WriteBatch struct {
db *DB
throttle *y.Throttle
err error
commitTs uint64
}

// NewWriteBatch creates a new WriteBatch. This provides a way to conveniently do a lot of writes,
Expand All @@ -37,6 +38,13 @@ type WriteBatch struct {
// creating and committing transactions. Due to the nature of SSI guaratees provided by Badger,
// blind writes can never encounter transaction conflicts (ErrConflict).
func (db *DB) NewWriteBatch() *WriteBatch {
if db.opt.managedTxns {
panic("cannot use NewWriteBatch in managed mode. Use NewWriteBatchAt instead")
}
return db.newWriteBatch()
}

func (db *DB) newWriteBatch() *WriteBatch {
return &WriteBatch{
db: db,
txn: db.newTransaction(true, true),
Expand Down Expand Up @@ -136,6 +144,7 @@ func (wb *WriteBatch) commit() error {
wb.txn.CommitWith(wb.callback)
wb.txn = wb.db.newTransaction(true, true)
wb.txn.readTs = 0 // We're not reading anything.
wb.txn.commitTs = wb.commitTs
return wb.err
}

Expand Down
13 changes: 13 additions & 0 deletions managed_db.go
Expand Up @@ -40,6 +40,19 @@ func (db *DB) NewTransactionAt(readTs uint64, update bool) *Txn {
return txn
}

// NewWriteBatchAt is similar to NewWriteBatch but it allows user to set the commit timestamp.
// NewWriteBatchAt is supposed to be used only in the managed mode.
func (db *DB) NewWriteBatchAt(commitTs uint64) *WriteBatch {
if !db.opt.managedTxns {
panic("cannot use NewWriteBatchAt with managedDB=false. Use NewWriteBatch instead")
}

wb := db.newWriteBatch()
wb.commitTs = commitTs
wb.txn.commitTs = commitTs
return wb
}

// CommitAt commits the transaction, following the same logic as Commit(), but
// at the given commit timestamp. This will panic if not used with managed transactions.
//
Expand Down
46 changes: 46 additions & 0 deletions managed_db_test.go
Expand Up @@ -570,3 +570,49 @@ func TestDropPrefixRace(t *testing.T) {
require.True(t, after < before)
db.Close()
}

func TestWriteBatchManagedMode(t *testing.T) {
key := func(i int) []byte {
return []byte(fmt.Sprintf("%10d", i))
}
val := func(i int) []byte {
return []byte(fmt.Sprintf("%128d", i))
}
opt := DefaultOptions("")
opt.managedTxns = true
opt.MaxTableSize = 1 << 15 // This would create multiple transactions in write batch.
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb := db.NewWriteBatchAt(1)
defer wb.Cancel()

N, M := 50000, 1000
start := time.Now()

for i := 0; i < N; i++ {
require.NoError(t, wb.Set(key(i), val(i)))
}
for i := 0; i < M; i++ {
require.NoError(t, wb.Delete(key(i)))
}
require.NoError(t, wb.Flush())
t.Logf("Time taken for %d writes (w/ test options): %s\n", N+M, time.Since(start))

err := db.View(func(txn *Txn) error {
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()

i := M
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, string(key(i)), string(item.Key()))
valcopy, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, val(i), valcopy)
i++
}
require.Equal(t, N, i)
return nil
})
require.NoError(t, err)
})
}

0 comments on commit 68da170

Please sign in to comment.