Skip to content
This repository has been archived by the owner on Mar 9, 2019. It is now read-only.

Commit

Permalink
Merge pull request #118 from benbjohnson/commit-hook
Browse files Browse the repository at this point in the history
Add Tx.OnCommit() handler.
  • Loading branch information
benbjohnson committed Apr 4, 2014
2 parents d667ae0 + 394e42e commit af1551e
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 12 deletions.
42 changes: 30 additions & 12 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ type txid uint64
// are using them. A long running read transaction can cause the database to
// quickly grow.
type Tx struct {
writable bool
managed bool
db *DB
meta *meta
buckets *buckets
nodes map[pgid]*node
pages map[pgid]*page
pending []*node
stats TxStats
writable bool
managed bool
db *DB
meta *meta
buckets *buckets
nodes map[pgid]*node
pages map[pgid]*page
pending []*node
stats TxStats
commitHandlers []func()
}

// init initializes the transaction.
Expand Down Expand Up @@ -175,6 +176,11 @@ func (t *Tx) DeleteBucket(name string) error {
return nil
}

// OnCommit adds a handler function to be executed after the transaction successfully commits.
func (t *Tx) OnCommit(fn func()) {
t.commitHandlers = append(t.commitHandlers, fn)
}

// Commit writes all changes to disk and updates the meta page.
// Returns an error if a disk write error occurs.
func (t *Tx) Commit() error {
Expand All @@ -185,7 +191,6 @@ func (t *Tx) Commit() error {
} else if !t.writable {
return ErrTxNotWritable
}
defer t.close()

// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.

Expand All @@ -197,13 +202,15 @@ func (t *Tx) Commit() error {
// spill data onto dirty pages.
startTime = time.Now()
if err := t.spill(); err != nil {
t.close()
return err
}
t.stats.SpillTime += time.Since(startTime)

// Spill buckets page.
p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1)
if err != nil {
t.close()
return err
}
t.buckets.write(p)
Expand All @@ -217,6 +224,7 @@ func (t *Tx) Commit() error {
t.db.freelist.free(t.id(), t.page(t.meta.freelist))
p, err = t.allocate((t.db.freelist.size() / t.db.pageSize) + 1)
if err != nil {
t.close()
return err
}
t.db.freelist.write(p)
Expand All @@ -225,15 +233,25 @@ func (t *Tx) Commit() error {
// Write dirty pages to disk.
startTime = time.Now()
if err := t.write(); err != nil {
t.close()
return err
}

// Write meta to disk.
if err := t.writeMeta(); err != nil {
t.close()
return err
}
t.stats.WriteTime += time.Since(startTime)

// Finalize the transaction.
t.close()

// Execute commit handlers now that the locks have been removed.
for _, fn := range t.commitHandlers {
fn()
}

return nil
}

Expand All @@ -250,13 +268,13 @@ func (t *Tx) Rollback() error {

func (t *Tx) close() {
if t.writable {
t.db.rwlock.Unlock()

// Merge statistics.
t.db.metalock.Lock()
t.db.stats.TxStats.add(&t.stats)
t.db.metalock.Unlock()

// Remove writer lock.
t.db.rwlock.Unlock()
} else {
t.db.removeTx(t)
}
Expand Down
28 changes: 28 additions & 0 deletions tx_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bolt

import (
"errors"
"fmt"
"math/rand"
"os"
Expand Down Expand Up @@ -484,6 +485,33 @@ func TestTxCursorIterateReverse(t *testing.T) {
fmt.Fprint(os.Stderr, "\n")
}

// Ensure that Tx commit handlers are called after a transaction successfully commits.
func TestTx_OnCommit(t *testing.T) {
var x int
withOpenDB(func(db *DB, path string) {
db.Update(func(tx *Tx) error {
tx.OnCommit(func() { x += 1 })
tx.OnCommit(func() { x += 2 })
return tx.CreateBucket("widgets")
})
})
assert.Equal(t, 3, x)
}

// Ensure that Tx commit handlers are NOT called after a transaction rolls back.
func TestTx_OnCommit_Rollback(t *testing.T) {
var x int
withOpenDB(func(db *DB, path string) {
db.Update(func(tx *Tx) error {
tx.OnCommit(func() { x += 1 })
tx.OnCommit(func() { x += 2 })
tx.CreateBucket("widgets")
return errors.New("rollback this commit")
})
})
assert.Equal(t, 0, x)
}

// Benchmark the performance iterating over a cursor.
func BenchmarkTxCursor(b *testing.B) {
indexes := rand.Perm(b.N)
Expand Down

0 comments on commit af1551e

Please sign in to comment.