Skip to content

Commit

Permalink
pkg: create package traceutil for tracing. mvcc: add tracing
Browse files Browse the repository at this point in the history
steps:range from the in-memory index tree; range from boltdb.
etcdserver: add tracing steps: agreement among raft nodes before
linerized reading; authentication; filter and sort kv pairs; assemble
the response.
  • Loading branch information
YoyinZyc committed Sep 30, 2019
1 parent 594005d commit f4e7fc5
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 28 deletions.
23 changes: 16 additions & 7 deletions etcdserver/apply.go
Expand Up @@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/traceutil"
"go.etcd.io/etcd/pkg/types"

"github.com/gogo/protobuf/proto"
Expand All @@ -50,7 +51,7 @@ type applierV3 interface {
Apply(r *pb.InternalRaftRequest) *applyResult

Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error)
Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)
Expand Down Expand Up @@ -119,7 +120,7 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
switch {
case r.Range != nil:
ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range)
ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
case r.Put != nil:
ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put)
case r.DeleteRange != nil:
Expand Down Expand Up @@ -245,12 +246,18 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
return resp, nil
}

func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
trace, ok := ctx.Value("trace").(*traceutil.Trace)
if !ok || trace == nil {
trace = traceutil.New("Apply Range")
ctx = context.WithValue(ctx, "trace", trace)
}

resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}

if txn == nil {
txn = a.s.kv.Read()
txn = a.s.kv.Read(trace)
defer txn.End()
}

Expand Down Expand Up @@ -327,7 +334,7 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
rr.KVs = rr.KVs[:r.Limit]
resp.More = true
}

trace.Step("Filter and sort the key-value pairs.")
resp.Header.Revision = rr.Rev
resp.Count = int64(rr.Count)
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
Expand All @@ -337,12 +344,14 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
}
resp.Kvs[i] = &rr.KVs[i]
}
trace.Step("Assemble the response.")
return resp, nil
}

func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
isWrite := !isTxnReadonly(rt)
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read())
trace := traceutil.New("ReadOnlyTxn")
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace))

txnPath := compareToPath(txn, rt)
if isWrite {
Expand Down Expand Up @@ -516,7 +525,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
respi := tresp.Responses[i].Response
switch tv := req.Request.(type) {
case *pb.RequestOp_RequestRange:
resp, err := a.Range(txn, tv.RequestRange)
resp, err := a.Range(context.TODO(), txn, tv.RequestRange)
if err != nil {
if lg != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
Expand Down
5 changes: 3 additions & 2 deletions etcdserver/apply_auth.go
Expand Up @@ -15,6 +15,7 @@
package etcdserver

import (
"context"
"sync"

"go.etcd.io/etcd/auth"
Expand Down Expand Up @@ -83,11 +84,11 @@ func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutRespon
return aa.applierV3.Put(txn, r)
}

func (aa *authApplierV3) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
func (aa *authApplierV3) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
return nil, err
}
return aa.applierV3.Range(txn, r)
return aa.applierV3.Range(ctx, txn, r)
}

func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/corrupt.go
Expand Up @@ -386,7 +386,7 @@ func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResp
return nil, ErrCorrupt
}

func (a *applierV3Corrupt) Range(txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) {
return nil, ErrCorrupt
}

Expand Down
14 changes: 9 additions & 5 deletions etcdserver/util.go
Expand Up @@ -24,6 +24,7 @@ import (
"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/etcdserver/api/rafthttp"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/pkg/traceutil"
"go.etcd.io/etcd/pkg/types"

"go.uber.org/zap"
Expand Down Expand Up @@ -108,7 +109,7 @@ func warnOfExpensiveRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Strin
if !isNil(respMsg) {
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
}
warnOfExpensiveGenericRequest(lg, now, reqStringer, "", resp, err)
warnOfExpensiveGenericRequest(lg, nil, now, reqStringer, "", resp, err)
}

func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
Expand All @@ -126,18 +127,18 @@ func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnR
}
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
}
warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err)
warnOfExpensiveGenericRequest(lg, nil, now, reqStringer, "read-only range ", resp, err)
}

func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, trace *traceutil.Trace, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
var resp string
if !isNil(rangeResponse) {
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
}
warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err)
warnOfExpensiveGenericRequest(lg, trace, now, reqStringer, "read-only range ", resp, err)
}

func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
func warnOfExpensiveGenericRequest(lg *zap.Logger, trace *traceutil.Trace, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
d := time.Since(now)
if d > warnApplyDuration {
if lg != nil {
Expand All @@ -159,6 +160,9 @@ func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fm
}
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
}
if trace != nil {
trace.Log(lg)
}
slowApplies.Inc()
}
}
Expand Down
13 changes: 11 additions & 2 deletions etcdserver/v3_server.go
Expand Up @@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/lease/leasehttp"
"go.etcd.io/etcd/mvcc"
"go.etcd.io/etcd/pkg/traceutil"
"go.etcd.io/etcd/raft"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -85,14 +86,18 @@ type Authenticator interface {
}

func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
trace := traceutil.New("Range")
ctx = context.WithValue(ctx, "trace", trace)

var resp *pb.RangeResponse
var err error
defer func(start time.Time) {
warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err)
warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), trace, start, r, resp, err)
}(time.Now())

if !r.Serializable {
err = s.linearizableReadNotify(ctx)
trace.Step("Agreement among raft nodes before linearized reading.")
if err != nil {
return nil, err
}
Expand All @@ -101,7 +106,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}

get := func() { resp, err = s.applyV3Base.Range(nil, r) }
get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
err = serr
return nil, err
Expand Down Expand Up @@ -558,6 +563,10 @@ func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) e
if err = chk(ai); err != nil {
return err
}

if trace, ok := ctx.Value("trace").(*traceutil.Trace); ok && trace != nil {
trace.Step("Authentication.")
}
// fetch response for serialized request
get()
// check for stale token revision in case the auth store was updated while
Expand Down
3 changes: 2 additions & 1 deletion mvcc/kv.go
Expand Up @@ -18,6 +18,7 @@ import (
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/traceutil"
)

type RangeOptions struct {
Expand Down Expand Up @@ -102,7 +103,7 @@ type KV interface {
WriteView

// Read creates a read transaction.
Read() TxnRead
Read(trace *traceutil.Trace) TxnRead

// Write creates a write transaction.
Write() TxnWrite
Expand Down
2 changes: 1 addition & 1 deletion mvcc/kv_test.go
Expand Up @@ -47,7 +47,7 @@ var (
return kv.Range(key, end, ro)
}
txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
txn := kv.Read()
txn := kv.Read(nil)
defer txn.End()
return txn.Range(key, end, ro)
}
Expand Down
6 changes: 3 additions & 3 deletions mvcc/kv_view.go
Expand Up @@ -19,19 +19,19 @@ import "go.etcd.io/etcd/lease"
type readView struct{ kv KV }

func (rv *readView) FirstRev() int64 {
tr := rv.kv.Read()
tr := rv.kv.Read(nil)
defer tr.End()
return tr.FirstRev()
}

func (rv *readView) Rev() int64 {
tr := rv.kv.Read()
tr := rv.kv.Read(nil)
defer tr.End()
return tr.Rev()
}

func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
tr := rv.kv.Read()
tr := rv.kv.Read(nil)
defer tr.End()
return tr.Range(key, end, ro)
}
Expand Down
6 changes: 3 additions & 3 deletions mvcc/kvstore_test.go
Expand Up @@ -658,7 +658,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)

// readTx simulates a long read request
readTx1 := s.Read()
readTx1 := s.Read(nil)

// write should not be blocked by reads
done := make(chan struct{})
Expand All @@ -673,7 +673,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
}

// readTx2 simulates a short read request
readTx2 := s.Read()
readTx2 := s.Read(nil)
ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
ret, err := readTx2.Range([]byte("foo"), nil, ro)
if err != nil {
Expand Down Expand Up @@ -756,7 +756,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) {
mu.Lock()
wKVs := make(kvs, len(committedKVs))
copy(wKVs, committedKVs)
tx := s.Read()
tx := s.Read(nil)
mu.Unlock()
// get all keys in backend store, and compare with wKVs
ret, err := tx.Range([]byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
Expand Down
15 changes: 12 additions & 3 deletions mvcc/kvstore_txn.go
Expand Up @@ -18,6 +18,7 @@ import (
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/traceutil"
"go.uber.org/zap"
)

Expand All @@ -27,9 +28,11 @@ type storeTxnRead struct {

firstRev int64
rev int64

trace *traceutil.Trace
}

func (s *store) Read() TxnRead {
func (s *store) Read(trace *traceutil.Trace) TxnRead {
s.mu.RLock()
s.revMu.RLock()
// backend holds b.readTx.RLock() only when creating the concurrentReadTx. After
Expand All @@ -38,7 +41,7 @@ func (s *store) Read() TxnRead {
tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})
}

func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
Expand Down Expand Up @@ -66,7 +69,7 @@ func (s *store) Write() TxnWrite {
tx := s.b.BatchTx()
tx.Lock()
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{s, tx, 0, 0},
storeTxnRead: storeTxnRead{s, tx, 0, 0, nil},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
Expand Down Expand Up @@ -124,6 +127,9 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
}

revpairs := tr.s.kvindex.Revisions(key, end, rev)
if tr.trace != nil {
tr.trace.Step("Range keys from in-memory index tree.")
}
if len(revpairs) == 0 {
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
}
Expand Down Expand Up @@ -163,6 +169,9 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
}
}
}
if tr.trace != nil {
tr.trace.Step("Range keys from bolt db.")
}
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
}

Expand Down

0 comments on commit f4e7fc5

Please sign in to comment.