Skip to content

Commit

Permalink
[WIP] mvcc: full concurrent read
Browse files Browse the repository at this point in the history
  • Loading branch information
jingyih committed Mar 6, 2019
1 parent 8c228d6 commit 6bff469
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 30 deletions.
34 changes: 34 additions & 0 deletions mvcc/backend/backend.go
Expand Up @@ -51,6 +51,8 @@ var (
type Backend interface {
ReadTx() ReadTx
BatchTx() BatchTx
// ConcurrentReadTx returns a non-blocking read transaction.
ConcurrentReadTx() ReadTx

Snapshot() Snapshot
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
Expand Down Expand Up @@ -166,6 +168,7 @@ func newBackend(bcfg BackendConfig) *backend {
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
},
buckets: make(map[string]*bolt.Bucket),
txWg: new(sync.WaitGroup),
},

stopc: make(chan struct{}),
Expand All @@ -187,6 +190,24 @@ func (b *backend) BatchTx() BatchTx {

func (b *backend) ReadTx() ReadTx { return b.readTx }

// ConcurrentReadTx creates and returns a new ReadTx, which:
// A) creates and keeps a copy of txReadBuffer in backend read Tx,
// B) references the boltdb read Tx (and its bucket cache) of current batch interval.
func (b *backend) ConcurrentReadTx() ReadTx {
b.readTx.mu.RLock()
defer b.readTx.mu.RUnlock()
// prevent boltdb read Tx from been rolled back until store read Tx is done.
b.readTx.txWg.Add(1)
newTxReadBuf := copyTxReadBuffer(b.readTx.buf)
return &concurrentReadTx{
buf: newTxReadBuf,
tx: b.readTx.tx,
txMu: &b.readTx.txMu,
buckets: b.readTx.buckets,
txWg: b.readTx.txWg,
}
}

// ForceCommit forces the current batching tx to commit.
func (b *backend) ForceCommit() {
b.batchTx.Commit()
Expand Down Expand Up @@ -536,3 +557,16 @@ func (s *snapshot) Close() error {
<-s.donec
return s.Tx.Rollback()
}

func copyTxReadBuffer(from txReadBuffer) txReadBuffer {
to := txReadBuffer{txBuffer: txBuffer{buckets: make(map[string]*bucketBuffer)}}
for k, v := range from.txBuffer.buckets {
bufCopy := make([]kv, len(v.buf))
copy(bufCopy, v.buf)
to.txBuffer.buckets[k] = &bucketBuffer{
buf: bufCopy,
used: v.used,
}
}
return to
}
2 changes: 2 additions & 0 deletions mvcc/backend/backend_test.go
Expand Up @@ -300,6 +300,8 @@ func TestBackendWritebackForEach(t *testing.T) {
}
}

// TODO: add a unit test for concurrentReadTx

func cleanup(b Backend, path string) {
b.Close()
os.Remove(path)
Expand Down
21 changes: 14 additions & 7 deletions mvcc/backend/batch_tx.go
Expand Up @@ -290,13 +290,9 @@ func (t *batchTxBuffered) commit(stop bool) {

func (t *batchTxBuffered) unsafeCommit(stop bool) {
if t.backend.readTx.tx != nil {
if err := t.backend.readTx.tx.Rollback(); err != nil {
if t.backend.lg != nil {
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
}
}
// wait all store read transactions using the current boltdb tx to finish,
// then close the boltdb tx
go waitAndRollback(t.backend.readTx.tx, t.backend.readTx.txWg, t.backend.lg)
t.backend.readTx.reset()
}

Expand All @@ -307,6 +303,17 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
}
}

func waitAndRollback(tx *bolt.Tx, wg *sync.WaitGroup, lg *zap.Logger) {
wg.Wait()
if err := tx.Rollback(); err != nil {
if lg != nil {
lg.Fatal("failed to rollback tx", zap.Error(err))
} else {
plog.Fatalf("cannot rollback tx (%s)", err)
}
}
}

func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
t.batchTx.UnsafePut(bucketName, key, value)
t.buf.put(bucketName, key, value)
Expand Down
102 changes: 92 additions & 10 deletions mvcc/backend/read_tx.go
Expand Up @@ -40,10 +40,12 @@ type readTx struct {
mu sync.RWMutex
buf txReadBuffer

// txmu protects accesses to buckets and tx on Range requests.
txmu sync.RWMutex
// txMu protects accesses to buckets and tx on Range requests.
txMu sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket
// txWg protects tx from being rolled back until all reads using this tx are done.
txWg *sync.WaitGroup
}

func (rt *readTx) Lock() { rt.mu.RLock() }
Expand All @@ -67,23 +69,23 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]

// find/cache bucket
bn := string(bucketName)
rt.txmu.RLock()
rt.txMu.RLock()
bucket, ok := rt.buckets[bn]
rt.txmu.RUnlock()
rt.txMu.RUnlock()
if !ok {
rt.txmu.Lock()
rt.txMu.Lock()
bucket = rt.tx.Bucket(bucketName)
rt.buckets[bn] = bucket
rt.txmu.Unlock()
rt.txMu.Unlock()
}

// ignore missing bucket since may have been created in this batch
if bucket == nil {
return keys, vals
}
rt.txmu.Lock()
rt.txMu.Lock()
c := bucket.Cursor()
rt.txmu.Unlock()
rt.txMu.Unlock()

k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
return append(k2, keys...), append(v2, vals...)
Expand All @@ -104,9 +106,9 @@ func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) err
if err := rt.buf.ForEach(bucketName, getDups); err != nil {
return err
}
rt.txmu.Lock()
rt.txMu.Lock()
err := unsafeForEach(rt.tx, bucketName, visitNoDup)
rt.txmu.Unlock()
rt.txMu.Unlock()
if err != nil {
return err
}
Expand All @@ -117,4 +119,84 @@ func (rt *readTx) reset() {
rt.buf.reset()
rt.buckets = make(map[string]*bolt.Bucket)
rt.tx = nil
rt.txWg = new(sync.WaitGroup)
}

// TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation?
type concurrentReadTx struct {
buf txReadBuffer
txMu *sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket // note: A map value is a pointer
txWg *sync.WaitGroup
}

func (rt *concurrentReadTx) Lock() {}

func (rt *concurrentReadTx) Unlock() {
rt.txWg.Done()
}

func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
dups := make(map[string]struct{})
getDups := func(k, v []byte) error {
dups[string(k)] = struct{}{}
return nil
}
visitNoDup := func(k, v []byte) error {
if _, ok := dups[string(k)]; ok {
return nil
}
return visitor(k, v)
}
if err := rt.buf.ForEach(bucketName, getDups); err != nil {
return err
}
rt.txMu.Lock()
err := unsafeForEach(rt.tx, bucketName, visitNoDup)
rt.txMu.Unlock()
if err != nil {
return err
}
return rt.buf.ForEach(bucketName, visitor)
}

func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if endKey == nil {
// forbid duplicates for single keys
limit = 1
}
if limit <= 0 {
limit = math.MaxInt64
}
if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
panic("do not use unsafeRange on non-keys bucket")
}
keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
if int64(len(keys)) == limit {
return keys, vals
}

// find/cache bucket
bn := string(bucketName)
rt.txMu.RLock()
bucket, ok := rt.buckets[bn]
rt.txMu.RUnlock()
if !ok {
rt.txMu.Lock()
bucket = rt.tx.Bucket(bucketName)
rt.buckets[bn] = bucket
rt.txMu.Unlock()
}

// ignore missing bucket since may have been created in this batch
if bucket == nil {
return keys, vals
}
rt.txMu.Lock()
c := bucket.Cursor()
rt.txMu.Unlock()

k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
return append(k2, keys...), append(v2, vals...)
}
58 changes: 47 additions & 11 deletions mvcc/kvstore_test.go
Expand Up @@ -645,30 +645,65 @@ func TestTxnPut(t *testing.T) {
}
}

func TestTxnBlockBackendForceCommit(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
func TestConcurrentReadAndWrite(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend() // TODO: do we need to make batch interval longer?
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)

txn := s.Read()
// write something to read later
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)

// readTx simulates a long read request
readTx1 := s.Read()

// write should not be blocked by reads
done := make(chan struct{})
go func() {
s.b.ForceCommit()
s.Put([]byte("foo"), []byte("newBar"), lease.NoLease) // this is a write Txn
done <- struct{}{}
}()
select {
case <-done:
t.Fatalf("failed to block ForceCommit")
case <-time.After(100 * time.Millisecond):
case <-time.After(1 * time.Second):
t.Fatalf("write should not be blocked by read")
}

txn.End()
select {
case <-done:
case <-time.After(5 * time.Second): // wait 5 seconds for CI with slow IO
testutil.FatalStack(t, "failed to execute ForceCommit")
// readTx2 simulates a short read request
readTx2 := s.Read()
ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
ret, err := readTx2.Range([]byte("foo"), nil, ro)
if err != nil {
t.Fatalf("failed to range: %v", err)
}
// readTx2 should see the result of new write
w := mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("newBar"),
CreateRevision: 2,
ModRevision: 3,
Version: 2,
}
if !reflect.DeepEqual(ret.KVs[0], w) {
t.Fatalf("range result = %+v, want = %+v", ret.KVs[0], w)
}
readTx2.End()

ret, err = readTx1.Range([]byte("foo"), nil, ro)
if err != nil {
t.Fatalf("failed to range: %v", err)
}
// readTx1 should not see the result of new write
w = mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 2,
ModRevision: 2,
Version: 1,
}
if !reflect.DeepEqual(ret.KVs[0], w) {
t.Fatalf("range result = %+v, want = %+v", ret.KVs[0], w)
}
readTx1.End()
}

// TODO: test attach key to lessor
Expand Down Expand Up @@ -752,6 +787,7 @@ type fakeBackend struct {

func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx }
func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx { return b.tx }
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
func (b *fakeBackend) Size() int64 { return 0 }
func (b *fakeBackend) SizeInUse() int64 { return 0 }
Expand Down
3 changes: 1 addition & 2 deletions mvcc/kvstore_txn.go
Expand Up @@ -31,9 +31,8 @@ type storeTxnRead struct {

func (s *store) Read() TxnRead {
s.mu.RLock()
tx := s.b.ReadTx()
s.revMu.RLock()
tx.Lock()
tx := s.b.ConcurrentReadTx()
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
Expand Down

0 comments on commit 6bff469

Please sign in to comment.