Skip to content

Commit

Permalink
mvcc: allow large concurrent reads under light write workload
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Feb 8, 2018
1 parent b83244b commit 8ef182b
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 20 deletions.
13 changes: 4 additions & 9 deletions internal/mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ func (b *backend) run() {
b.batchTx.CommitAndStop()
return
}
b.batchTx.Commit()
if b.batchTx.pending != 0 {
b.batchTx.Commit()
}
t.Reset(b.batchInterval)
}
}
Expand All @@ -293,9 +295,6 @@ func (b *backend) Defrag() error {
return err
}

// commit to update metadata like db.size
b.batchTx.Commit()

return nil
}

Expand All @@ -310,11 +309,7 @@ func (b *backend) defrag() error {
b.mu.Lock()
defer b.mu.Unlock()

// block concurrent read requests while resetting tx
b.readTx.mu.Lock()
defer b.readTx.mu.Unlock()

b.batchTx.unsafeCommit(true)
b.batchTx.commit(true)
b.batchTx.tx = nil

tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
Expand Down
12 changes: 1 addition & 11 deletions internal/mvcc/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte
isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
limit = 1
}

for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
vs = append(vs, cv)
keys = append(keys, ck)
Expand Down Expand Up @@ -158,17 +159,6 @@ func (t *batchTx) commit(stop bool) {
// commit the last tx
if t.tx != nil {
if t.pending == 0 && !stop {
t.backend.mu.RLock()
defer t.backend.mu.RUnlock()

// t.tx.DB()==nil if 'CommitAndStop' calls 'batchTx.commit(true)',
// which initializes *bolt.Tx.db and *bolt.Tx.meta as nil; panics t.tx.Size().
// Server must make sure 'batchTx.commit(false)' does not follow
// 'batchTx.commit(true)' (e.g. stopping backend, and inflight Hash call).
size := t.tx.Size()
db := t.tx.DB()
atomic.StoreInt64(&t.backend.size, size)
atomic.StoreInt64(&t.backend.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
return
}

Expand Down

0 comments on commit 8ef182b

Please sign in to comment.