Skip to content

Commit

Permalink
Merge pull request #10523 from jingyih/fully_concurrent_reads
Browse files Browse the repository at this point in the history
mvcc: fully concurrent read
  • Loading branch information
xiang90 committed Jun 14, 2019
2 parents 0de9b8a + 55066eb commit 2c5162a
Show file tree
Hide file tree
Showing 10 changed files with 406 additions and 47 deletions.
46 changes: 32 additions & 14 deletions integration/metrics_test.go
Expand Up @@ -16,6 +16,7 @@ package integration

import (
"context"
"fmt"
"net/http"
"strconv"
"testing"
Expand Down Expand Up @@ -103,22 +104,39 @@ func testMetricDbSizeDefrag(t *testing.T, name string) {
t.Fatal(kerr)
}

// Put to move PendingPages to FreePages
if _, err = kvc.Put(context.TODO(), putreq); err != nil {
t.Fatal(err)
}
time.Sleep(500 * time.Millisecond)
validateAfterCompactionInUse := func() error {
// Put to move PendingPages to FreePages
if _, err = kvc.Put(context.TODO(), putreq); err != nil {
t.Fatal(err)
}
time.Sleep(500 * time.Millisecond)

afterCompactionInUse, err := clus.Members[0].Metric("etcd_mvcc_db_total_size_in_use_in_bytes")
if err != nil {
t.Fatal(err)
}
aciu, err := strconv.Atoi(afterCompactionInUse)
if err != nil {
t.Fatal(err)
afterCompactionInUse, err := clus.Members[0].Metric("etcd_mvcc_db_total_size_in_use_in_bytes")
if err != nil {
t.Fatal(err)
}
aciu, err := strconv.Atoi(afterCompactionInUse)
if err != nil {
t.Fatal(err)
}
if biu <= aciu {
return fmt.Errorf("expected less than %d, got %d after compaction", biu, aciu)
}
return nil
}
if biu <= aciu {
t.Fatalf("expected less than %d, got %d after compaction", biu, aciu)

// backend rollbacks read transaction asynchronously (PR #10523),
// which causes the result to be flaky. Retry 3 times.
maxRetry, retry := 3, 0
for {
err := validateAfterCompactionInUse()
if err == nil {
break
}
retry++
if retry >= maxRetry {
t.Fatalf(err.Error())
}
}

// defrag should give freed space back to fs
Expand Down
34 changes: 33 additions & 1 deletion mvcc/backend/backend.go
Expand Up @@ -49,8 +49,11 @@ var (
)

type Backend interface {
// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
ReadTx() ReadTx
BatchTx() BatchTx
// ConcurrentReadTx returns a non-blocking read transaction.
ConcurrentReadTx() ReadTx

Snapshot() Snapshot
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
Expand All @@ -63,6 +66,8 @@ type Backend interface {
// Since the backend can manage free space in a non-byte unit such as
// number of pages, the returned value can be not exactly accurate in bytes.
SizeInUse() int64
// OpenReadTxN returns the number of currently open read transactions in the backend.
OpenReadTxN() int64
Defrag() error
ForceCommit()
Close() error
Expand All @@ -87,6 +92,8 @@ type backend struct {
sizeInUse int64
// commits counts number of commits since start
commits int64
// openReadTxN is the number of currently open read transactions in the backend
openReadTxN int64

mu sync.RWMutex
db *bolt.DB
Expand Down Expand Up @@ -166,6 +173,7 @@ func newBackend(bcfg BackendConfig) *backend {
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
},
buckets: make(map[string]*bolt.Bucket),
txWg: new(sync.WaitGroup),
},

stopc: make(chan struct{}),
Expand All @@ -187,6 +195,24 @@ func (b *backend) BatchTx() BatchTx {

func (b *backend) ReadTx() ReadTx { return b.readTx }

// ConcurrentReadTx creates and returns a new ReadTx, which:
// A) creates and keeps a copy of backend.readTx.txReadBuffer,
// B) references the boltdb read Tx (and its bucket cache) of current batch interval.
func (b *backend) ConcurrentReadTx() ReadTx {
b.readTx.RLock()
defer b.readTx.RUnlock()
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
b.readTx.txWg.Add(1)
// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
return &concurrentReadTx{
buf: b.readTx.buf.unsafeCopy(),
tx: b.readTx.tx,
txMu: &b.readTx.txMu,
buckets: b.readTx.buckets,
txWg: b.readTx.txWg,
}
}

// ForceCommit forces the current batching tx to commit.
func (b *backend) ForceCommit() {
b.batchTx.Commit()
Expand Down Expand Up @@ -491,8 +517,10 @@ func (b *backend) begin(write bool) *bolt.Tx {

size := tx.Size()
db := tx.DB()
stats := db.Stats()
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))
atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))

return tx
}
Expand All @@ -509,6 +537,10 @@ func (b *backend) unsafeBegin(write bool) *bolt.Tx {
return tx
}

func (b *backend) OpenReadTxN() int64 {
return atomic.LoadInt64(&b.openReadTxN)
}

// NewTmpBackend creates a backend implementation for testing.
func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
Expand Down
29 changes: 29 additions & 0 deletions mvcc/backend/backend_test.go
Expand Up @@ -250,6 +250,35 @@ func TestBackendWriteback(t *testing.T) {
}
}

// TestConcurrentReadTx ensures that current read transaction can see all prior writes stored in read buffer
func TestConcurrentReadTx(t *testing.T) {
b, tmpPath := NewTmpBackend(time.Hour, 10000)
defer cleanup(b, tmpPath)

wtx1 := b.BatchTx()
wtx1.Lock()
wtx1.UnsafeCreateBucket([]byte("key"))
wtx1.UnsafePut([]byte("key"), []byte("abc"), []byte("ABC"))
wtx1.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1"))
wtx1.Unlock()

wtx2 := b.BatchTx()
wtx2.Lock()
wtx2.UnsafePut([]byte("key"), []byte("def"), []byte("DEF"))
wtx2.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2"))
wtx2.Unlock()

rtx := b.ConcurrentReadTx()
rtx.RLock() // no-op
k, v := rtx.UnsafeRange([]byte("key"), []byte("abc"), []byte("\xff"), 0)
rtx.RUnlock()
wKey := [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")}
wVal := [][]byte{[]byte("ABC"), []byte("DEF"), []byte("2")}
if !reflect.DeepEqual(wKey, k) || !reflect.DeepEqual(wVal, v) {
t.Errorf("want k=%+v, v=%+v; got k=%+v, v=%+v", wKey, wVal, k, v)
}
}

// TestBackendWritebackForEach checks that partially written / buffered
// data is visited in the same order as fully committed data.
func TestBackendWritebackForEach(t *testing.T) {
Expand Down
17 changes: 11 additions & 6 deletions mvcc/backend/batch_tx.go
Expand Up @@ -306,13 +306,18 @@ func (t *batchTxBuffered) commit(stop bool) {

func (t *batchTxBuffered) unsafeCommit(stop bool) {
if t.backend.readTx.tx != nil {
if err := t.backend.readTx.tx.Rollback(); err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
// wait all store read transactions using the current boltdb tx to finish,
// then close the boltdb tx
go func(tx *bolt.Tx, wg *sync.WaitGroup) {
wg.Wait()
if err := tx.Rollback(); err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
}
}
}
}(t.backend.readTx.tx, t.backend.readTx.txWg)
t.backend.readTx.reset()
}

Expand Down
106 changes: 96 additions & 10 deletions mvcc/backend/read_tx.go
Expand Up @@ -42,10 +42,13 @@ type readTx struct {
mu sync.RWMutex
buf txReadBuffer

// txmu protects accesses to buckets and tx on Range requests.
txmu sync.RWMutex
// TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
// txMu protects accesses to buckets and tx on Range requests.
txMu sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket
// txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
txWg *sync.WaitGroup
}

func (rt *readTx) Lock() { rt.mu.Lock() }
Expand All @@ -71,23 +74,23 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]

// find/cache bucket
bn := string(bucketName)
rt.txmu.RLock()
rt.txMu.RLock()
bucket, ok := rt.buckets[bn]
rt.txmu.RUnlock()
rt.txMu.RUnlock()
if !ok {
rt.txmu.Lock()
rt.txMu.Lock()
bucket = rt.tx.Bucket(bucketName)
rt.buckets[bn] = bucket
rt.txmu.Unlock()
rt.txMu.Unlock()
}

// ignore missing bucket since may have been created in this batch
if bucket == nil {
return keys, vals
}
rt.txmu.Lock()
rt.txMu.Lock()
c := bucket.Cursor()
rt.txmu.Unlock()
rt.txMu.Unlock()

k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
return append(k2, keys...), append(v2, vals...)
Expand All @@ -108,9 +111,9 @@ func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err
if err := rt.buf.ForEach(bucketName, getDups); err != nil {
return err
}
rt.txmu.Lock()
rt.txMu.Lock()
err := unsafeForEach(rt.tx, bucketName, visitNoDup)
rt.txmu.Unlock()
rt.txMu.Unlock()
if err != nil {
return err
}
Expand All @@ -121,4 +124,87 @@ func (rt *readTx) reset() {
rt.buf.reset()
rt.buckets = make(map[string]*bolt.Bucket)
rt.tx = nil
rt.txWg = new(sync.WaitGroup)
}

// TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation?
type concurrentReadTx struct {
buf txReadBuffer
txMu *sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket
txWg *sync.WaitGroup
}

func (rt *concurrentReadTx) Lock() {}
func (rt *concurrentReadTx) Unlock() {}

// RLock is no-op. concurrentReadTx does not need to be locked after it is created.
func (rt *concurrentReadTx) RLock() {}

// RUnlock signals the end of concurrentReadTx.
func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }

func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
dups := make(map[string]struct{})
getDups := func(k, v []byte) error {
dups[string(k)] = struct{}{}
return nil
}
visitNoDup := func(k, v []byte) error {
if _, ok := dups[string(k)]; ok {
return nil
}
return visitor(k, v)
}
if err := rt.buf.ForEach(bucketName, getDups); err != nil {
return err
}
rt.txMu.Lock()
err := unsafeForEach(rt.tx, bucketName, visitNoDup)
rt.txMu.Unlock()
if err != nil {
return err
}
return rt.buf.ForEach(bucketName, visitor)
}

func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if endKey == nil {
// forbid duplicates for single keys
limit = 1
}
if limit <= 0 {
limit = math.MaxInt64
}
if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
panic("do not use unsafeRange on non-keys bucket")
}
keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
if int64(len(keys)) == limit {
return keys, vals
}

// find/cache bucket
bn := string(bucketName)
rt.txMu.RLock()
bucket, ok := rt.buckets[bn]
rt.txMu.RUnlock()
if !ok {
rt.txMu.Lock()
bucket = rt.tx.Bucket(bucketName)
rt.buckets[bn] = bucket
rt.txMu.Unlock()
}

// ignore missing bucket since may have been created in this batch
if bucket == nil {
return keys, vals
}
rt.txMu.Lock()
c := bucket.Cursor()
rt.txMu.Unlock()

k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
return append(k2, keys...), append(v2, vals...)
}
22 changes: 22 additions & 0 deletions mvcc/backend/tx_buffer.go
Expand Up @@ -88,6 +88,19 @@ func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) er
return nil
}

// unsafeCopy returns a copy of txReadBuffer, caller should acquire backend.readTx.RLock()
func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
txrCopy := txReadBuffer{
txBuffer: txBuffer{
buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
},
}
for bucketName, bucket := range txr.txBuffer.buckets {
txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
}
return txrCopy
}

type kv struct {
key []byte
val []byte
Expand Down Expand Up @@ -179,3 +192,12 @@ func (bb *bucketBuffer) Less(i, j int) bool {
return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0
}
func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }

func (bb *bucketBuffer) Copy() *bucketBuffer {
bbCopy := bucketBuffer{
buf: make([]kv, len(bb.buf)),
used: bb.used,
}
copy(bbCopy.buf, bb.buf)
return &bbCopy
}
3 changes: 3 additions & 0 deletions mvcc/kvstore.go
Expand Up @@ -354,6 +354,9 @@ func (s *store) restore() error {
reportDbTotalSizeInUseInBytesMu.Lock()
reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
reportDbTotalSizeInUseInBytesMu.Unlock()
reportDbOpenReadTxNMu.Lock()
reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) }
reportDbOpenReadTxNMu.Unlock()

min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: 1}, min)
Expand Down

0 comments on commit 2c5162a

Please sign in to comment.