Skip to content

Commit

Permalink
server: set multiple concurrentReadTx instances share one txReadBuffe…
Browse files Browse the repository at this point in the history
…r as long as no new write operations committed.
  • Loading branch information
wilsonwang371 committed May 9, 2021
1 parent aeb9b5f commit a4bda87
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
18 changes: 16 additions & 2 deletions server/mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ type backend struct {
batchTx *batchTxBuffered

readTx *readTx
// concurrentReadTx is not supposed to write to its txReadBuffer
// so multiple concurrentReadTx instances can share one txReadBuffer
// as long as there is no new write operations done.
cachedPrevReadTxBuf *txReadBuffer

stopc chan struct{}
donec chan struct{}
Expand Down Expand Up @@ -183,7 +187,8 @@ func newBackend(bcfg BackendConfig) *backend {
readTx: &readTx{
baseReadTx: baseReadTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
isModified: false,
},
buckets: make(map[string]*bolt.Bucket),
txWg: new(sync.WaitGroup),
Expand Down Expand Up @@ -222,9 +227,18 @@ func (b *backend) ConcurrentReadTx() ReadTx {
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
b.readTx.txWg.Add(1)
// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
var modified bool
// check if new data has been committed since last txReadBuffer copying.
modified = b.readTx.buf.IsModified()
if modified || (!modified && b.cachedPrevReadTxBuf == nil) {
tmp := b.readTx.buf.unsafeCopy()
b.cachedPrevReadTxBuf = &tmp
}
b.readTx.buf.ResetModified()
// concurrentReadTx is not supposed to write to its txReadBuffer
return &concurrentReadTx{
baseReadTx: baseReadTx{
buf: b.readTx.buf.unsafeCopy(),
buf: *b.cachedPrevReadTxBuf,
txMu: b.readTx.txMu,
tx: b.readTx.tx,
buckets: b.readTx.buckets,
Expand Down
20 changes: 18 additions & 2 deletions server/mvcc/backend/tx_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sort"
)

const bucketBufferInitialSize = 512

// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
type txBuffer struct {
buckets map[string]*bucketBuffer
Expand Down Expand Up @@ -69,10 +71,23 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
rb.merge(wb)
}
txw.reset()
// mark read buffer modified
txr.isModified = true
}

// txReadBuffer accesses buffered updates.
type txReadBuffer struct{ txBuffer }
type txReadBuffer struct{
txBuffer
isModified bool
}

func (txr *txReadBuffer) IsModified() bool {
return txr.isModified
}

func (txr *txReadBuffer) ResetModified() {
txr.isModified = false
}

func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if b := txr.buckets[string(bucketName)]; b != nil {
Expand All @@ -94,6 +109,7 @@ func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
txBuffer: txBuffer{
buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
},
isModified: false,
}
for bucketName, bucket := range txr.txBuffer.buckets {
txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
Expand All @@ -114,7 +130,7 @@ type bucketBuffer struct {
}

func newBucketBuffer() *bucketBuffer {
return &bucketBuffer{buf: make([]kv, 512), used: 0}
return &bucketBuffer{buf: make([]kv, bucketBufferInitialSize), used: 0}
}

func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
Expand Down

0 comments on commit a4bda87

Please sign in to comment.