Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support WriteBatch API in managed mode #948

Merged
merged 5 commits into from Aug 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions batch.go
Expand Up @@ -29,6 +29,7 @@ type WriteBatch struct {
db *DB
throttle *y.Throttle
err error
commitTs uint64
}

// NewWriteBatch creates a new WriteBatch. This provides a way to conveniently do a lot of writes,
Expand All @@ -37,6 +38,13 @@ type WriteBatch struct {
// creating and committing transactions. Due to the nature of SSI guaratees provided by Badger,
// blind writes can never encounter transaction conflicts (ErrConflict).
func (db *DB) NewWriteBatch() *WriteBatch {
if db.opt.managedTxns {
panic("cannot use NewWriteBatch in managed mode. Use NewWriteBatchAt instead")
}
return db.newWriteBatch()
}

func (db *DB) newWriteBatch() *WriteBatch {
return &WriteBatch{
db: db,
txn: db.newTransaction(true, true),
Expand Down Expand Up @@ -136,6 +144,7 @@ func (wb *WriteBatch) commit() error {
wb.txn.CommitWith(wb.callback)
wb.txn = wb.db.newTransaction(true, true)
wb.txn.readTs = 0 // We're not reading anything.
wb.txn.commitTs = wb.commitTs
return wb.err
}

Expand Down
13 changes: 13 additions & 0 deletions managed_db.go
Expand Up @@ -40,6 +40,19 @@ func (db *DB) NewTransactionAt(readTs uint64, update bool) *Txn {
return txn
}

// NewWriteBatchAt is similar to NewWriteBatch but it allows user to set the commit timestamp.
// NewWriteBatchAt is supposed to be used only in the managed mode.
func (db *DB) NewWriteBatchAt(commitTs uint64) *WriteBatch {
if !db.opt.managedTxns {
panic("cannot use NewWriteBatchAt with managedDB=false. Use NewWriteBatch instead")
}

wb := db.newWriteBatch()
wb.commitTs = commitTs
wb.txn.commitTs = commitTs
return wb
}

// CommitAt commits the transaction, following the same logic as Commit(), but
// at the given commit timestamp. This will panic if not used with managed transactions.
//
Expand Down
46 changes: 46 additions & 0 deletions managed_db_test.go
Expand Up @@ -570,3 +570,49 @@ func TestDropPrefixRace(t *testing.T) {
require.True(t, after < before)
db.Close()
}

func TestWriteBatchManagedMode(t *testing.T) {
key := func(i int) []byte {
return []byte(fmt.Sprintf("%10d", i))
}
val := func(i int) []byte {
return []byte(fmt.Sprintf("%128d", i))
}
opt := DefaultOptions("")
opt.managedTxns = true
opt.MaxTableSize = 1 << 15 // This would create multiple transactions in write batch.
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb := db.NewWriteBatchAt(1)
defer wb.Cancel()

N, M := 50000, 1000
start := time.Now()

for i := 0; i < N; i++ {
require.NoError(t, wb.Set(key(i), val(i)))
}
for i := 0; i < M; i++ {
require.NoError(t, wb.Delete(key(i)))
}
require.NoError(t, wb.Flush())
t.Logf("Time taken for %d writes (w/ test options): %s\n", N+M, time.Since(start))

err := db.View(func(txn *Txn) error {
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()

i := M
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, string(key(i)), string(item.Key()))
valcopy, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, val(i), valcopy)
i++
}
require.Equal(t, N, i)
return nil
})
require.NoError(t, err)
})
}