diff --git a/server/mvcc/backend/backend.go b/server/mvcc/backend/backend.go index ecf941f1fe73..a962b19cede5 100644 --- a/server/mvcc/backend/backend.go +++ b/server/mvcc/backend/backend.go @@ -79,6 +79,12 @@ type Snapshot interface { Close() error } +type txReadBufferCache struct { + mu sync.Mutex + buf *txReadBuffer + bufVersion uint64 +} + type backend struct { // size and commits are used with atomic operations so they must be // 64-bit aligned, otherwise 32-bit tests will crash @@ -102,6 +108,10 @@ type backend struct { batchTx *batchTxBuffered readTx *readTx + // concurrentReadTx is not supposed to write to its txReadBuffer + // so multiple concurrentReadTx instances can share one txReadBuffer + // as long as there is no new write operations done. + cachedReadTxBuf txReadBufferCache stopc chan struct{} donec chan struct{} @@ -183,19 +193,29 @@ func newBackend(bcfg BackendConfig) *backend { readTx: &readTx{ baseReadTx: baseReadTx{ buf: txReadBuffer{ - txBuffer: txBuffer{make(map[string]*bucketBuffer)}, + txBuffer: txBuffer{make(map[string]*bucketBuffer)}, + bufVersion: 0, }, buckets: make(map[string]*bolt.Bucket), txWg: new(sync.WaitGroup), txMu: new(sync.RWMutex), }, }, + cachedReadTxBuf: txReadBufferCache{ + mu: sync.Mutex{}, + bufVersion: 0, + buf: nil, + }, stopc: make(chan struct{}), donec: make(chan struct{}), lg: bcfg.Logger, } + + var readTxCopy txReadBuffer + readTxCopy = b.readTx.buf.unsafeCopy() + b.cachedReadTxBuf.buf = &readTxCopy b.batchTx = newBatchTxBuffered(b) // We set it after newBatchTxBuffered to skip the 'empty' commit. b.hooks = bcfg.Hooks @@ -221,10 +241,38 @@ func (b *backend) ConcurrentReadTx() ReadTx { defer b.readTx.RUnlock() // prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock(). b.readTx.txWg.Add(1) + // TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval. + var modifiedBufVersion uint64 + var buf *txReadBuffer + + // we are using a mutex to protect the critical region where only one concurrent read tx can set the cache + b.cachedReadTxBuf.mu.Lock() + modifiedBufVersion = b.readTx.buf.Version() + + // check if new data has been committed since last txReadBuffer copying. + if modifiedBufVersion != b.cachedReadTxBuf.bufVersion { + // we need to release the lock to maximize parallelism. + b.cachedReadTxBuf.mu.Unlock() + tmp := b.readTx.buf.unsafeCopy() + b.cachedReadTxBuf.mu.Lock() + buf = &tmp + } else { + buf = b.cachedReadTxBuf.buf + } + + // it is possible we have buffer version updated during a unsafeCopy() + // if someone else has modified it, we will give up. + if modifiedBufVersion == b.readTx.buf.Version() { + b.cachedReadTxBuf.buf = buf + b.cachedReadTxBuf.bufVersion = modifiedBufVersion + } + b.cachedReadTxBuf.mu.Unlock() + + // concurrentReadTx is not supposed to write to its txReadBuffer return &concurrentReadTx{ baseReadTx: baseReadTx{ - buf: b.readTx.buf.unsafeCopy(), + buf: *buf, txMu: b.readTx.txMu, tx: b.readTx.tx, buckets: b.readTx.buckets, diff --git a/server/mvcc/backend/tx_buffer.go b/server/mvcc/backend/tx_buffer.go index 4df6d0c5951d..a67a44da0d32 100644 --- a/server/mvcc/backend/tx_buffer.go +++ b/server/mvcc/backend/tx_buffer.go @@ -19,6 +19,8 @@ import ( "sort" ) +const bucketBufferInitialSize = 512 + // txBuffer handles functionality shared between txWriteBuffer and txReadBuffer. type txBuffer struct { buckets map[string]*bucketBuffer @@ -69,10 +71,22 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { rb.merge(wb) } txw.reset() + // increase the buffer version + txr.bufVersion++ } // txReadBuffer accesses buffered updates. -type txReadBuffer struct{ txBuffer } +type txReadBuffer struct { + txBuffer + // bufVersion is used to check if the buffer is modified recently + bufVersion uint64 +} + +// Version returns the current buffer version which is updated each time after +// a write transaction +func (txr *txReadBuffer) Version() uint64 { + return txr.bufVersion +} func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { if b := txr.buckets[string(bucketName)]; b != nil { @@ -94,6 +108,7 @@ func (txr *txReadBuffer) unsafeCopy() txReadBuffer { txBuffer: txBuffer{ buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)), }, + bufVersion: 0, } for bucketName, bucket := range txr.txBuffer.buckets { txrCopy.txBuffer.buckets[bucketName] = bucket.Copy() @@ -114,7 +129,7 @@ type bucketBuffer struct { } func newBucketBuffer() *bucketBuffer { - return &bucketBuffer{buf: make([]kv, 512), used: 0} + return &bucketBuffer{buf: make([]kv, bucketBufferInitialSize), used: 0} } func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {